camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raul Kripalani <r...@evosent.com>
Subject Re: svn commit: r1431152 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
Date Thu, 10 Jan 2013 12:19:06 GMT
Ah, I overlooked this option. I'll fix it in a new commit soon.

Thanks for the report, Claus!

Raúl.

On Thu, Jan 10, 2013 at 12:15 PM, Claus Ibsen <claus.ibsen@gmail.com> wrote:

> Hi
>
> I think we should honor the option replyToCacheLevelName, so people
> can configure the cache level.
> Now you hardcoded it to consumer.
>
> Some brokers may not work well with that. So giving end users the
> option to set the replyToCacheLevelName is better.
> The default can still be cache consumer.
>
>
>
> On Thu, Jan 10, 2013 at 12:49 AM,  <raulk@apache.org> wrote:
> > Author: raulk
> > Date: Wed Jan  9 23:49:26 2013
> > New Revision: 1431152
> >
> > URL: http://svn.apache.org/viewvc?rev=1431152&view=rev
> > Log:
> > CAMEL-5865 Enhanced concurrent consumers support for JMS producers using
> Temp Reply Queue for replies
> >
> > Added:
> >
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
> > Modified:
> >
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
> >
> > Modified:
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
> > URL:
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1431152&r1=1431151&r2=1431152&view=diff
> >
> ==============================================================================
> > ---
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
> (original)
> > +++
> camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
> Wed Jan  9 23:49:26 2013
> > @@ -16,7 +16,10 @@
> >   */
> >  package org.apache.camel.component.jms.reply;
> >
> > +import java.util.concurrent.atomic.AtomicBoolean;
> > +
> >  import javax.jms.Destination;
> > +import javax.jms.ExceptionListener;
> >  import javax.jms.JMSException;
> >  import javax.jms.Message;
> >  import javax.jms.Session;
> > @@ -37,11 +40,23 @@ import org.springframework.jms.support.d
> >   * @version
> >   */
> >  public class TemporaryQueueReplyManager extends ReplyManagerSupport {
> > -
> > +
> > +    final TemporaryReplyQueueDestinationResolver destResolver = new
> TemporaryReplyQueueDestinationResolver();
> > +
> >      public TemporaryQueueReplyManager(CamelContext camelContext) {
> >          super(camelContext);
> >      }
> >
> > +    @Override
> > +    public Destination getReplyTo() {
> > +        try {
> > +            destResolver.destinationReady();
> > +        } catch (InterruptedException e) {
> > +            log.warn("Interrupted while waiting for JMSReplyTo
> destination refresh", e);
> > +        }
> > +        return super.getReplyTo();
> > +    }
> > +
> >      public String registerReply(ReplyManager replyManager, Exchange
> exchange, AsyncCallback callback,
> >                                  String originalCorrelationId, String
> correlationId, long requestTimeout) {
> >          // add to correlation map
> > @@ -90,15 +105,7 @@ public class TemporaryQueueReplyManager
> >          DefaultMessageListenerContainer answer = new
> DefaultJmsMessageListenerContainer(endpoint);
> >
> >          answer.setDestinationName("temporary");
> > -        answer.setDestinationResolver(new DestinationResolver() {
> > -            public Destination resolveDestinationName(Session session,
> String destinationName,
> > -                                                      boolean
> pubSubDomain) throws JMSException {
> > -                // use a temporary queue to gather the reply message
> > -                TemporaryQueue queue = session.createTemporaryQueue();
> > -                setReplyTo(queue);
> > -                return queue;
> > -            }
> > -        });
> > +        answer.setDestinationResolver(destResolver);
> >          answer.setAutoStartup(true);
> >          if (endpoint.getMaxMessagesPerTask() >= 0) {
> >
>  answer.setMaxMessagesPerTask(endpoint.getMaxMessagesPerTask());
> > @@ -113,6 +120,9 @@ public class TemporaryQueueReplyManager
> >
>  answer.setMaxConcurrentConsumers(endpoint.getMaxConcurrentConsumers());
> >          }
> >          answer.setConnectionFactory(endpoint.getConnectionFactory());
> > +        // we use CACHE_CONSUMER to cling to the consumer as long as we
> can, since we can only consume
> > +        // msgs from the JMS Connection that created the temp
> destination in the first place
> > +
>  answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
> >          String clientId = endpoint.getClientId();
> >          if (clientId != null) {
> >              clientId += ".CamelReplyManager";
> > @@ -121,11 +131,10 @@ public class TemporaryQueueReplyManager
> >
> >          // we cannot do request-reply over JMS with transaction
> >          answer.setSessionTransacted(false);
> > -
> > +
> >          // other optional properties
> > -        if (endpoint.getExceptionListener() != null) {
> > -
>  answer.setExceptionListener(endpoint.getExceptionListener());
> > -        }
> > +        answer.setExceptionListener(new
> TemporaryReplyQueueExceptionListener(destResolver,
> endpoint.getExceptionListener()));
> > +
> >          if (endpoint.getErrorHandler() != null) {
> >              answer.setErrorHandler(endpoint.getErrorHandler());
> >          } else {
> > @@ -144,8 +153,9 @@ public class TemporaryQueueReplyManager
> >              answer.setTaskExecutor(endpoint.getTaskExecutor());
> >          }
> >
> > -        // setup a bean name which is used ny Spring JMS as the thread
> name
> > -        String name = "TemporaryQueueReplyManager[" +
> answer.getDestinationName() + "]";
> > +        // setup a bean name which is used by Spring JMS as the thread
> name
> > +        // use the name of the request destination
> > +        String name = "TemporaryQueueReplyManager[" +
> endpoint.getDestinationName() + "]";
> >          answer.setBeanName(name);
> >
> >          if (answer.getConcurrentConsumers() > 1) {
> > @@ -156,4 +166,60 @@ public class TemporaryQueueReplyManager
> >          return answer;
> >      }
> >
> > +    private final class TemporaryReplyQueueExceptionListener implements
> ExceptionListener {
> > +        private final TemporaryReplyQueueDestinationResolver
> destResolver;
> > +        private final ExceptionListener delegate;
> > +
> > +        private
> TemporaryReplyQueueExceptionListener(TemporaryReplyQueueDestinationResolver
> destResolver,
> > +                ExceptionListener delegate) {
> > +            this.destResolver = destResolver;
> > +            this.delegate = delegate;
> > +        }
> > +
> > +        @Override
> > +        public void onException(JMSException exception) {
> > +            // capture exceptions, and schedule a refresh of the
> ReplyTo destination
> > +            log.warn("Exception inside the DMLC for Temporary ReplyTo
> Queue for destination " + endpoint.getDestinationName() +
> > +                       ", refreshing ReplyTo destination", exception);
> > +            destResolver.scheduleRefresh();
> > +            // serve as a proxy for any exception listener the user may
> have set explicitly
> > +            if (delegate != null) {
> > +                delegate.onException(exception);
> > +            }
> > +        }
> > +
> > +    }
> > +
> > +    private final class TemporaryReplyQueueDestinationResolver
> implements DestinationResolver {
> > +        private TemporaryQueue queue;
> > +        private AtomicBoolean refreshWanted = new AtomicBoolean(false);
> > +
> > +        public Destination resolveDestinationName(Session session,
> String destinationName, boolean pubSubDomain)
> > +                throws JMSException {
> > +            // use a temporary queue to gather the reply message
> > +            synchronized (refreshWanted) {
> > +                if (queue == null || refreshWanted.compareAndSet(true,
> false)) {
> > +                    queue = session.createTemporaryQueue();
> > +                    setReplyTo(queue);
> > +                    log.debug("Refreshed Temporary ReplyTo Queue. New
> queue: " + queue.getQueueName());
> > +                    refreshWanted.notifyAll();
> > +                }
> > +            }
> > +            return queue;
> > +        }
> > +
> > +        public void scheduleRefresh() {
> > +            refreshWanted.set(true);
> > +        }
> > +
> > +        public void destinationReady() throws InterruptedException {
> > +            if (refreshWanted.get()) {
> > +                synchronized (refreshWanted) {
> > +                    log.debug("Waiting for new Temp ReplyTo destination
> to be assigned to continue");
> > +                    refreshWanted.wait();
> > +                }
> > +            }
> > +        }
> > +    }
> > +
> >  }
> >
> > Added:
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
> > URL:
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java?rev=1431152&view=auto
> >
> ==============================================================================
> > ---
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
> (added)
> > +++
> camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
> Wed Jan  9 23:49:26 2013
> > @@ -0,0 +1,143 @@
> > +/**
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements.  See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License.  You may obtain a copy of the License at
> > + *
> > + *      http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +package org.apache.camel.component.jms;
> > +
> > +import java.util.Map;
> > +import java.util.concurrent.Callable;
> > +import java.util.concurrent.ConcurrentHashMap;
> > +import java.util.concurrent.ExecutorService;
> > +import java.util.concurrent.Executors;
> > +import java.util.concurrent.atomic.AtomicInteger;
> > +
> > +import javax.jms.ConnectionFactory;
> > +
> > +import org.apache.activemq.ActiveMQConnectionFactory;
> > +import org.apache.activemq.broker.BrokerService;
> > +import org.apache.activemq.broker.TransportConnection;
> > +import org.apache.activemq.pool.PooledConnectionFactory;
> > +import org.apache.camel.CamelContext;
> > +import org.apache.camel.Exchange;
> > +import org.apache.camel.Processor;
> > +import org.apache.camel.builder.RouteBuilder;
> > +import org.apache.camel.test.junit4.CamelTestSupport;
> > +import org.junit.Test;
> > +
> > +import static
> org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
> > +
> > +/**
> > + * Reliability tests for JMS TempQueue Reply Manager with multiple
> consumers.
> > + * @version
> > + */
> > +public class JmsRequestReplyTempQueueMultipleConsumersTest extends
> CamelTestSupport {
> > +
> > +    private Map<String, AtomicInteger> msgsPerThread = new
> ConcurrentHashMap<String, AtomicInteger>();
> > +    private BrokerService broker;
> > +    private PooledConnectionFactory connectionFactory;
> > +
> > +    @Test
> > +    public void testMultipleConsumingThreads() throws Exception {
> > +        doSendMessages(1000, 5);
> > +        assertTrue("Expected multiple consuming threads, but only
> found: " +  msgsPerThread.keySet().size(),
> > +                msgsPerThread.keySet().size() > 1);
> > +    }
> > +
> > +    @Test
> > +    public void testTempQueueRefreshed() throws Exception {
> > +        doSendMessages(500, 5);
> > +        connectionFactory.clear();
> > +        doSendMessages(100, 5);
> > +        connectionFactory.clear();
> > +        doSendMessages(100, 5);
> > +        connectionFactory.clear();
> > +        doSendMessages(100, 5);
> > +        connectionFactory.clear();
> > +        doSendMessages(100, 5);
> > +        connectionFactory.clear();
> > +        doSendMessages(100, 5);
> > +    }
> > +
> > +    private void doSendMessages(int files, int poolSize) throws
> Exception {
> > +        getMockEndpoint("mock:result").expectedMessageCount(files);
> > +        getMockEndpoint("mock:result").expectsNoDuplicates(body());
> > +
> > +        ExecutorService executor =
> Executors.newFixedThreadPool(poolSize);
> > +        for (int i = 0; i < files; i++) {
> > +            final int index = i;
> > +            executor.submit(new Callable<Object>() {
> > +                public Object call() throws Exception {
> > +                    template.sendBody("seda:start", "Message " + index);
> > +                    return null;
> > +                }
> > +            });
> > +        }
> > +
> > +        assertMockEndpointsSatisfied();
> > +        resetMocks();
> > +        executor.shutdownNow();
> > +    }
> > +
> > +    public void startBroker() throws Exception {
> > +        String brokerName = "test-broker-" + System.currentTimeMillis();
> > +        String brokerUri = "vm://" + brokerName;
> > +        broker = new BrokerService();
> > +        broker.setBrokerName(brokerName);
> > +        broker.setBrokerId(brokerName);
> > +        broker.addConnector(brokerUri);
> > +        broker.setPersistent(false);
> > +        broker.start();
> > +    }
> > +
> > +    protected CamelContext createCamelContext() throws Exception {
> > +        CamelContext camelContext = super.createCamelContext();
> > +        //startBroker();
> > +
> > +        connectionFactory = (PooledConnectionFactory)
> CamelJmsTestHelper.createConnectionFactory();
> > +        camelContext.addComponent("jms",
> jmsComponentAutoAcknowledge(connectionFactory));
> > +
> > +        return camelContext;
> > +    }
> > +
> > +    @Override
> > +    protected RouteBuilder createRouteBuilder() throws Exception {
> > +        return new RouteBuilder() {
> > +            @Override
> > +            public void configure() throws Exception {
> > +                from("seda:start")
> > +
>  .inOut("jms:queue:foo?concurrentConsumers=10&maxConcurrentConsumers=20&recoveryInterval=10")
> > +                    .process(new Processor() {
> > +                        @Override
> > +                        public void process(Exchange exchange) throws
> Exception {
> > +                            String threadName =
> Thread.currentThread().getName();
> > +                            synchronized (msgsPerThread) {
> > +                               AtomicInteger count =
> msgsPerThread.get(threadName);
> > +                               if (count == null) {
> > +                                   count = new AtomicInteger(0);
> > +                                   msgsPerThread.put(threadName, count);
> > +                               }
> > +                               count.incrementAndGet();
> > +                            }
> > +                        }
> > +                    })
> > +                    .to("mock:result");
> > +
> > +
>  from("jms:queue:foo?concurrentConsumers=20&recoveryInterval=10")
> > +                    .setBody(simple("Reply >>> ${body}"));
> > +            }
> > +        };
> > +    }
> > +
> > +}
> >
> >
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> FuseSource is now part of Red Hat
> Email: cibsen@redhat.com
> Web: http://fusesource.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message