camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject svn commit: r1368413 [5/5] - in /camel/trunk: components/ components/camel-sjms/ components/camel-sjms/src/ components/camel-sjms/src/main/ components/camel-sjms/src/main/java/ components/camel-sjms/src/main/java/org/ components/camel-sjms/src/main/jav...
Date Thu, 02 Aug 2012 10:56:44 GMT
Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/TransactedQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/TransactedQueueProducerTest.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/TransactedQueueProducerTest.java
(added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/TransactedQueueProducerTest.java
Thu Aug  2 10:56:40 2012
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import java.util.Enumeration;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+import org.junit.Test;
+
+public class TransactedQueueProducerTest extends CamelTestSupport {
+    
+    private static final String TEST_DESTINATION_NAME = "transacted.test.queue";
+
+    @Produce
+    protected ProducerTemplate template;
+    protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.useJmx=false");
+    private Connection connection;
+    private Session session;
+    
+    public TransactedQueueProducerTest() {
+    }
+
+    @Override
+    protected boolean useJmx() {
+        return false;
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=false");
+        connection = connectionFactory.createConnection();
+        connection.start();
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (session != null) {
+            session.close();
+        }
+        if (connection != null) {
+            connection.stop();
+        }
+        super.tearDown();
+    }
+
+    @Test
+    public void testTransactedQueueProducer() throws Exception {
+        MessageConsumer mc = JmsObjectFactory.createQueueConsumer(session, TEST_DESTINATION_NAME);
+        assertNotNull(mc);
+        final String expectedBody = "Hello World!";
+        MockEndpoint mock = getMockEndpoint("mock:result");
+
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(expectedBody);
+
+        template.sendBody("direct:start.transacted", expectedBody);
+        mc.receive(5000);
+        session.rollback();
+        Message message = mc.receive(5000);
+        session.commit();
+        assertNotNull(message);
+        assertTrue(message instanceof TextMessage);
+        
+        TextMessage tm = (TextMessage) message;
+        String text = tm.getText();
+        assertNotNull(text);
+        
+        template.sendBody("direct:finish", text);
+        
+        mock.assertIsSatisfied();
+        mc.close();
+
+    }
+
+    @Test
+    public void testTransactedQueueProducerAsynchronousOverride() throws Exception {
+        MessageConsumer mc = JmsObjectFactory.createQueueConsumer(session, TEST_DESTINATION_NAME);
+        assertNotNull(mc);
+        final String expectedBody = "Hello World!";
+        MockEndpoint mock = getMockEndpoint("mock:result");
+
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(expectedBody);
+
+        template.sendBody("direct:start.transacted.async.override", expectedBody);
+        mc.receive(5000);
+        session.rollback();
+        Message message = mc.receive(5000);
+        session.commit();
+        assertNotNull(message);
+        assertTrue(message instanceof TextMessage);
+        
+        TextMessage tm = (TextMessage) message;
+        String text = tm.getText();
+        assertNotNull(text);
+        
+        template.sendBody("direct:finish", text);
+        
+        mock.assertIsSatisfied();
+        mc.close();
+
+    }
+
+    @Test
+    public void testTransactedQueueProducerFailed() throws Exception {
+        MessageConsumer mc = JmsObjectFactory.createQueueConsumer(session, TEST_DESTINATION_NAME);
+        assertNotNull(mc);
+        final String expectedBody = "Transaction Failed";
+        MockEndpoint mock = getMockEndpoint("mock:result");
+
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(expectedBody);
+
+        template.sendBody("direct:start.transacted", expectedBody);
+        mc.receive(5000);
+        session.rollback();
+        Enumeration<?> enumeration = session.createBrowser(session.createQueue(TEST_DESTINATION_NAME)).getEnumeration();
+        while (enumeration.hasMoreElements()) {
+            TextMessage tm = (TextMessage) enumeration.nextElement();
+            String text = tm.getText();
+            log.info("Element from Enumeration: {}", text);  
+            assertNotNull(text);
+            
+            template.sendBody("direct:finish", text);
+        }
+        
+        mock.assertIsSatisfied();
+        mc.close();
+    }
+    
+    /*
+     * @see org.apache.camel.test.junit4.CamelTestSupport#createCamelContext()
+     *
+     * @return
+     * @throws Exception
+     */
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        SjmsComponent component = new SjmsComponent();
+        component.setMaxConnections(1);
+        component.setConnectionFactory(connectionFactory);
+        camelContext.addComponent("sjms", component);
+        return camelContext;
+    }
+
+    /*
+     * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+     *
+     * @return
+     * @throws Exception
+     */
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                
+                from("direct:start.transacted")
+                    .to("sjms:queue:" + TEST_DESTINATION_NAME + "?transacted=true");
+                
+                from("direct:start.transacted.async.override")
+                    .to("sjms:queue:" + TEST_DESTINATION_NAME + "?transacted=true&synchronous=false");
+                
+                from("direct:finish")
+                    .to("log:test.log.1?showBody=true", "mock:result");
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
(added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
Thu Aug  2 10:56:40 2012
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.support;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+/**
+ * TODO Add Class documentation for JmsTestSupport
+ *
+ */
+public class JmsTestSupport extends CamelTestSupport {
+    private static final String BROKER_URI = "tcp://localhost:33333";
+    @Produce
+    protected ProducerTemplate template;
+    private BrokerService broker;
+    private Connection connection;
+    private Session session;
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        super.doPreSetup();
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setUseJmx(true);
+        broker.setPersistent(false);
+        broker.deleteAllMessages();
+        broker.addConnector(BROKER_URI);
+        broker.start();
+        super.setUp();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        DefaultCamelContext dcc = (DefaultCamelContext)context;
+        while (!dcc.isStopped()) {
+            log.info("Waiting on the Camel Context to stop");
+        }
+        log.info("Closing JMS Session");
+        if (getSession() != null) {
+            getSession().close();
+            setSession(null);
+        }
+        log.info("Closing JMS Connection");
+        if (connection != null) {
+            connection.stop();
+            connection = null;
+        }
+        log.info("Stopping the ActiveMQ Broker");
+        if (broker != null) {
+            broker.stop();
+            broker = null;
+        }
+    }
+    
+    /*
+     * @see org.apache.camel.test.junit4.CamelTestSupport#createCamelContext()
+     *
+     * @return
+     * @throws Exception
+     */
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URI);
+        connection = connectionFactory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        SjmsComponent component = new SjmsComponent();
+        component.setMaxConnections(1);
+        component.setConnectionFactory(connectionFactory);
+        camelContext.addComponent("sjms", component);
+        return camelContext;
+    }
+    
+    public void setSession(Session session) {
+        this.session = session;
+    }
+
+    public Session getSession() {
+        return session;
+    }
+
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncComponent.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncComponent.java
(added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncComponent.java
Thu Aug  2 10:56:40 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.support;
+
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+/**
+ *
+ */
+public class MyAsyncComponent extends DefaultComponent {
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object>
parameters) throws Exception {
+        MyAsyncEndpoint answer = new MyAsyncEndpoint(uri, this);
+        answer.setReply(prepareReply(remaining));
+        setProperties(answer, parameters);
+        return answer;
+    }
+
+    private String prepareReply(String value) {
+        // to make URIs valid we make the conventions of using ':' for ' ' and 
+        // capitalize words
+        String[] words = value.split(":");
+        String result = "";
+        for (String word : words) {
+            result += result.isEmpty() ? "" : " ";
+            result += word.substring(0, 1).toUpperCase(Locale.ENGLISH) + word.substring(1);
+        }
+        return result;
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncEndpoint.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncEndpoint.java
(added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncEndpoint.java
Thu Aug  2 10:56:40 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.support;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.SynchronousDelegateProducer;
+
+/**
+ *
+ */
+public class MyAsyncEndpoint extends DefaultEndpoint {
+
+    private String reply;
+    private long delay = 25;
+    private int failFirstAttempts;
+
+    public MyAsyncEndpoint(String endpointUri, Component component) {
+        super(endpointUri, component);
+    }
+
+    public Producer createProducer() throws Exception {
+        Producer answer = new MyAsyncProducer(this);
+        if (isSynchronous()) {
+            // force it to be synchronously
+            return new SynchronousDelegateProducer(answer);
+        } else {
+            return answer;
+        }
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        throw new UnsupportedOperationException("Consumer not supported");
+    }
+
+    public boolean isSingleton() {
+        return false;
+    }
+
+    public String getReply() {
+        return reply;
+    }
+
+    public void setReply(String reply) {
+        this.reply = reply;
+    }
+
+    public long getDelay() {
+        return delay;
+    }
+
+    public void setDelay(long delay) {
+        this.delay = delay;
+    }
+
+    public int getFailFirstAttempts() {
+        return failFirstAttempts;
+    }
+
+    public void setFailFirstAttempts(int failFirstAttempts) {
+        this.failFirstAttempts = failFirstAttempts;
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncProducer.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncProducer.java
(added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyAsyncProducer.java
Thu Aug  2 10:56:40 2012
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.support;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class MyAsyncProducer extends DefaultAsyncProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MyAsyncProducer.class);
+    private final ExecutorService executor;
+    private final AtomicInteger counter = new AtomicInteger();
+
+    public MyAsyncProducer(MyAsyncEndpoint endpoint) {
+        super(endpoint);
+        this.executor = endpoint.getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this,
"MyProducer");
+    }
+
+    public MyAsyncEndpoint getEndpoint() {
+        return (MyAsyncEndpoint) super.getEndpoint();
+    }
+
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        executor.submit(new Callable<Object>() {
+            public Object call() throws Exception {
+
+                LOG.info("Simulating a task which takes " + getEndpoint().getDelay() + "
millis to reply");
+                Thread.sleep(getEndpoint().getDelay());
+
+                int count = counter.incrementAndGet();
+                if (getEndpoint().getFailFirstAttempts() >= count) {
+                    LOG.info("Simulating a failure at attempt " + count);
+                    exchange.setException(new CamelExchangeException("Simulated error at
attempt " + count, exchange));
+                } else {
+                    String reply = getEndpoint().getReply();
+                    exchange.getOut().setBody(reply);
+                    // propagate headers
+                    exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+                    LOG.info("Setting reply " + reply);
+                }
+
+                LOG.info("Callback done(false)");
+                callback.done(false);
+                return null;
+            }
+        });
+
+        // indicate from this point forward its being routed asynchronously
+        LOG.info("Task submitted, now tell Camel routing engine to that this Exchange is
being continued asynchronously");
+        return false;
+    }
+
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyInOutTestConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyInOutTestConsumer.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyInOutTestConsumer.java
(added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MyInOutTestConsumer.java
Thu Aug  2 10:56:40 2012
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.support;
+
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ *
+ * @author sully6768
+ */
+public class MyInOutTestConsumer implements MessageListener {
+    private static int ackMode;
+    private static String clientQueueName;
+
+    private boolean transacted;
+    private MessageProducer producer;
+
+    static {
+        clientQueueName = "client.messages";
+        ackMode = Session.AUTO_ACKNOWLEDGE;
+    }
+    public MyInOutTestConsumer(ConnectionFactory connectionFactory) {
+        Connection connection;
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+            Session session = connection.createSession(transacted, ackMode);
+            Destination adminQueue = session.createQueue(clientQueueName);
+
+            //Setup a message producer to send message to the queue the server is consuming
from
+            this.producer = session.createProducer(adminQueue);
+            this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+            //Create a temporary queue that this client will listen for responses on then
create a consumer
+            //that consumes message from this temporary queue...for a real application a
client should reuse
+            //the same temp queue for each message to the server...one temp queue per client
+            Destination tempDest = session.createTemporaryQueue();
+            MessageConsumer responseConsumer = session.createConsumer(tempDest);
+
+            //This class will handle the messages to the temp queue as well
+            responseConsumer.setMessageListener(this);
+
+            //Now create the actual message you want to send
+            TextMessage txtMessage = session.createTextMessage();
+            txtMessage.setText("MyProtocolMessage");
+
+            //Set the reply to field to the temp queue you created above, this is the queue
the server
+            //will respond to
+            txtMessage.setJMSReplyTo(tempDest);
+
+            //Set a correlation ID so when you get a response you know which sent message
the response is for
+            //If there is never more than one outstanding message to the server then the
+            //same correlation ID can be used for all the messages...if there is more than
one outstanding
+            //message to the server you would presumably want to associate the correlation
ID with this
+            //message somehow...a Map works good
+            String correlationId = this.createRandomString();
+            txtMessage.setJMSCorrelationID(correlationId);
+            this.producer.send(txtMessage);
+        } catch (JMSException e) {
+            //Handle the exception appropriately
+        }
+    }
+
+    private String createRandomString() {
+        Random random = new Random(System.currentTimeMillis());
+        long randomLong = random.nextLong();
+        return Long.toHexString(randomLong);
+    }
+
+    public void onMessage(Message message) {
+        String messageText = null;
+        try {
+            if (message instanceof TextMessage) {
+                TextMessage textMessage = (TextMessage) message;
+                messageText = textMessage.getText();
+                System.out.println("messageText = " + messageText);
+            }
+        } catch (JMSException e) {
+            //Handle the exception appropriately
+        }
+    }
+    
+}

Added: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/SjmsConnectionTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/SjmsConnectionTestSupport.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/SjmsConnectionTestSupport.java
(added)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/SjmsConnectionTestSupport.java
Thu Aug  2 10:56:40 2012
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.support;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.camel.util.ObjectHelper;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO Add Class documentation for SjmsConnectionTestSupport
+ * 
+ */
+public abstract class SjmsConnectionTestSupport {
+
+    static {
+        System.setProperty("org.apache.activemq.default.directory.prefix", "target/activemq/");
+    }
+    
+    public static final String VM_BROKER_CONNECT_STRING = "vm://broker";
+    public static final String TCP_BROKER_CONNECT_STRING = "tcp://localhost:61616";
+    protected Logger logger = LoggerFactory.getLogger(getClass());
+    private ActiveMQConnectionFactory vmTestConnectionFactory;
+    private ActiveMQConnectionFactory testConnectionFactory;
+    private BrokerService brokerService;
+    private boolean persistenceEnabled;
+
+    public abstract String getConnectionUri();
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+    }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    @Before
+    public void setup() throws Exception {
+        if (ObjectHelper.isEmpty(getConnectionUri())
+                || getConnectionUri().startsWith("vm")) {
+            vmTestConnectionFactory = new ActiveMQConnectionFactory(
+                    VM_BROKER_CONNECT_STRING);
+        } else {
+            createBroker();
+        }
+    }
+
+    @After
+    public void teardown() throws Exception {
+
+        if (vmTestConnectionFactory != null) {
+            vmTestConnectionFactory = null;
+        }
+        if (testConnectionFactory != null) {
+            testConnectionFactory = null;
+        }
+        if (brokerService != null) {
+            destroyBroker();
+        }
+    }
+
+    /**
+     * Gets the ActiveMQConnectionFactory value of testConnectionFactory for
+     * this instance of SjmsConnectionTestSupport.
+     * 
+     * @return the testConnectionFactory
+     */
+    public ActiveMQConnectionFactory createTestConnectionFactory(String uri) {
+        ActiveMQConnectionFactory cf = null;
+        if (ObjectHelper.isEmpty(uri)) {
+            cf = new ActiveMQConnectionFactory(VM_BROKER_CONNECT_STRING);
+        } else {
+            cf = new ActiveMQConnectionFactory(uri);
+        }
+        return cf;
+    }
+
+    protected void createBroker() throws Exception {
+        String connectString = getConnectionUri();
+        if (ObjectHelper.isEmpty(connectString)) {
+            connectString = TCP_BROKER_CONNECT_STRING;
+        }
+        brokerService = new BrokerService();
+        brokerService.setPersistent(isPersistenceEnabled());
+        brokerService.addConnector(connectString);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    protected void destroyBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+    }
+
+    /**
+     * Sets the ActiveMQConnectionFactory value of testConnectionFactory for
+     * this instance of SjmsConnectionTestSupport.
+     * 
+     * @param testConnectionFactory
+     *            Sets ActiveMQConnectionFactory, default is TODO add default
+     */
+    public void setTestConnectionFactory(
+            ActiveMQConnectionFactory testConnectionFactory) {
+        this.testConnectionFactory = testConnectionFactory;
+    }
+
+    /**
+     * Gets the ActiveMQConnectionFactory value of testConnectionFactory for
+     * this instance of SjmsConnectionTestSupport.
+     * 
+     * @return the testConnectionFactory
+     */
+    public ActiveMQConnectionFactory getTestConnectionFactory() {
+        return testConnectionFactory;
+    }
+
+    /**
+     * Sets the boolean value of persistenceEnabled for this instance of
+     * SjmsConnectionTestSupport.
+     * 
+     * @param persistenceEnabled
+     *            Sets boolean, default is false
+     */
+    public void setPersistenceEnabled(boolean persistenceEnabled) {
+        this.persistenceEnabled = persistenceEnabled;
+    }
+
+    /**
+     * Gets the boolean value of persistenceEnabled for this instance of
+     * SjmsConnectionTestSupport.
+     * 
+     * @return the persistenceEnabled
+     */
+    public boolean isPersistenceEnabled() {
+        return persistenceEnabled;
+    }
+}

Added: camel/trunk/components/camel-sjms/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/resources/log4j.properties?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/resources/log4j.properties (added)
+++ camel/trunk/components/camel-sjms/src/test/resources/log4j.properties Thu Aug  2 10:56:40
2012
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+# The logging properties used
+#
+log4j.rootLogger=INFO, file
+
+# uncomment the following line to turn on Camel debugging
+log4j.logger.org.apache.activemq=info
+log4j.logger.org.apache.camel=info
+log4j.logger.org.apache.camel.converter=info
+log4j.logger.org.apache.camel.component.sjms=debug
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+log4j.throwableRenderer=org.apache.log4j.EnhancedThrowableRenderer
+
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.file.file=target/camel-sjms-test.log
+log4j.appender.file.append=true
\ No newline at end of file

Modified: camel/trunk/components/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/pom.xml?rev=1368413&r1=1368412&r2=1368413&view=diff
==============================================================================
--- camel/trunk/components/pom.xml (original)
+++ camel/trunk/components/pom.xml Thu Aug  2 10:56:40 2012
@@ -82,7 +82,7 @@
     <module>camel-freemarker</module>
     <module>camel-ftp</module>
     <module>camel-gae</module>
-	<module>camel-gson</module>
+    <module>camel-gson</module>
     <module>camel-guava-eventbus</module>
     <module>camel-guice</module>
     <module>camel-hawtdb</module>
@@ -141,6 +141,7 @@
     <module>camel-sip</module>
     <module>camel-smpp</module>
     <module>camel-snmp</module>
+    <module>camel-sjms</module>
     <module>camel-soap</module>
     <module>camel-solr</module>
     <module>camel-spring-batch</module>

Modified: camel/trunk/platforms/karaf/features/src/main/resources/features.xml
URL: http://svn.apache.org/viewvc/camel/trunk/platforms/karaf/features/src/main/resources/features.xml?rev=1368413&r1=1368412&r2=1368413&view=diff
==============================================================================
--- camel/trunk/platforms/karaf/features/src/main/resources/features.xml (original)
+++ camel/trunk/platforms/karaf/features/src/main/resources/features.xml Thu Aug  2 10:56:40
2012
@@ -731,6 +731,14 @@
     <bundle dependency='true'>mvn:org.apache.shiro/shiro-core/${shiro-version}</bundle>
     <bundle>mvn:org.apache.camel/camel-shiro/${project.version}</bundle>
   </feature>
+  <feature name='camel-sjms' version='${project.version}' resolver='(obr)' start-level='50'>
+    <bundle dependency='true'>mvn:org.apache.geronimo.specs/geronimo-annotation_1.0_spec/${gernimo-annotation-spec-version}</bundle>
+    <!-- JTA is not currently supported by SJMS but is a required dependency of the Geronimo
JMS Bundle -->
+    <bundle dependency='true'>mvn:org.apache.geronimo.specs/geronimo-jta_1.1_spec/${geronimo-jta-spec-version}</bundle>
+    <bundle dependency='true'>mvn:org.apache.geronimo.specs/geronimo-jms_1.1_spec/${geronimo-jms-spec-version}</bundle>
+    <feature version='${project.version}'>camel-core</feature>
+    <bundle>mvn:org.apache.camel/camel-sjms/${project.version}</bundle>
+  </feature>
   <feature name='camel-smpp' version='${project.version}' resolver='(obr)' start-level='50'>
     <feature version='${project.version}'>camel-core</feature>
     <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.jsmpp/${jsmpp-bundle-version}</bundle>



Mime
View raw message