activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r573615 - in /activemq/trunk: ./ activemq-core/src/main/java/org/apache/activemq/camel/component/ activemq-core/src/main/resources/META-INF/services/org/apache/camel/ activemq-core/src/main/resources/META-INF/services/org/apache/camel/compo...
Date Fri, 07 Sep 2007 15:44:48 GMT
Author: chirino
Date: Fri Sep  7 08:44:45 2007
New Revision: 573615

URL: http://svn.apache.org/viewvc?rev=573615&view=rev
Log:
Moved the camel-activemq module into the activemq-core module to break the circular dependency between the activemq and camel projects.


Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConverter.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQMessageConverter.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalComponent.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalEndpoint.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/package.html   (with props)
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq.journal
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQConfigureTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQJmsHeaderRouteTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQReplyToHeaderUsingConverterTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalConfigureTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRoutePerformance.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRouteTest.java   (with props)
Modified:
    activemq/trunk/assembly/pom.xml
    activemq/trunk/pom.xml

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java Fri Sep  7 08:44:45 2007
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.jms.JmsComponent;
+import org.apache.camel.component.jms.JmsConfiguration;
+
+/**
+ * The <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
+ *
+ * @version $Revision$
+ */
+public class ActiveMQComponent extends JmsComponent {
+    /**
+     * Creates an <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
+     *
+     * @return the created component
+     */
+    public static ActiveMQComponent activeMQComponent() {
+        return new ActiveMQComponent();
+    }
+
+    /**
+     * Creates an <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
+     * connecting to the given <a href="http://activemq.apache.org/configuring-transports.html">broker URL</a>
+     *
+     * @param brokerURL the URL to connect to
+     * @return the created component
+     */
+    public static ActiveMQComponent activeMQComponent(String brokerURL) {
+        ActiveMQComponent answer = new ActiveMQComponent();
+        answer.getConfiguration().setBrokerURL(brokerURL);
+        return answer;
+    }
+
+    public ActiveMQComponent() {
+    }
+
+    public ActiveMQComponent(CamelContext context) {
+        super(context);
+    }
+
+    public ActiveMQComponent(ActiveMQConfiguration configuration) {
+        super(configuration);
+    }
+
+    @Override
+    public ActiveMQConfiguration getConfiguration() {
+        return (ActiveMQConfiguration) super.getConfiguration();
+    }
+
+    public void setBrokerURL(String brokerURL) {
+        getConfiguration().setBrokerURL(brokerURL);
+    }
+
+
+    @Override
+    protected JmsConfiguration createConfiguration() {
+        return new ActiveMQConfiguration();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java Fri Sep  7 08:44:45 2007
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.activemq.spring.ActiveMQConnectionFactory;
+import org.apache.camel.component.jms.JmsConfiguration;
+
+import javax.jms.ConnectionFactory;
+
+/**
+ * @version $Revision$
+ */
+public class ActiveMQConfiguration extends JmsConfiguration {
+    private String brokerURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
+
+    public ActiveMQConfiguration() {
+    }
+
+    public String getBrokerURL() {
+        return brokerURL;
+    }
+
+    /**
+     * Sets the broker URL to use to connect to ActiveMQ using the
+     * <a href="http://activemq.apache.org/configuring-transports.html">ActiveMQ URI format</a>
+     *
+     * @param brokerURL the URL of the broker.
+     */
+    public void setBrokerURL(String brokerURL) {
+        this.brokerURL = brokerURL;
+    }
+
+    @Override
+    public ActiveMQConnectionFactory getListenerConnectionFactory() {
+        return (ActiveMQConnectionFactory) super.getListenerConnectionFactory();
+    }
+
+    @Override
+    public void setListenerConnectionFactory(ConnectionFactory listenerConnectionFactory) {
+        if (listenerConnectionFactory instanceof ActiveMQConnectionFactory) {
+            super.setListenerConnectionFactory(listenerConnectionFactory);
+        }
+        else {
+            throw new IllegalArgumentException("ConnectionFactory " + listenerConnectionFactory
+                    + " is not an instanceof " + ActiveMQConnectionFactory.class.getName());
+        }
+    }
+
+    @Override
+    protected ConnectionFactory createListenerConnectionFactory() {
+        ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory();
+        answer.setBrokerURL(getBrokerURL());
+        return answer;
+    }
+
+    @Override
+    protected ConnectionFactory createTemplateConnectionFactory() {
+        return new PooledConnectionFactory(getListenerConnectionFactory());
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConverter.java?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConverter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConverter.java Fri Sep  7 08:44:45 2007
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.camel.Converter;
+
+/**
+ * @version $Revision$
+ */
+@Converter
+public class ActiveMQConverter {
+    /**
+     * Converts a URL in ActiveMQ syntax to a destination such as to support
+     * "queue://foo.bar" or 'topic://bar.whatnot". Things default to queues if no scheme.
+     *
+     * This allows ActiveMQ destinations to be passed around as Strings and converted back again.
+     *
+     * @param name is the name of the queue or the full URI using prefixes queue:// or topic://
+     * @return the ActiveMQ destination
+     */
+    @Converter
+    public static ActiveMQDestination toDestination(String name) {
+        return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQConverter.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQMessageConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQMessageConverter.java?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQMessageConverter.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQMessageConverter.java Fri Sep  7 08:44:45 2007
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.*;
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.jms.JmsBinding;
+
+import javax.jms.MessageNotWriteableException;
+import javax.jms.JMSException;
+import java.io.Serializable;
+
+/**
+ * @version $Revision$
+ */
+@Converter
+public class ActiveMQMessageConverter {
+    private JmsBinding binding = new JmsBinding();
+
+    /**
+     * Converts the inbound message exchange to an ActiveMQ JMS message
+     *
+     * @return the ActiveMQ message
+     */
+    @Converter
+    public ActiveMQMessage toMessage(Exchange exchange) throws JMSException {
+        ActiveMQMessage message = createActiveMQMessage(exchange);
+        getBinding().appendJmsProperties(message, exchange);
+        return message;
+    }
+
+    private static ActiveMQMessage createActiveMQMessage(Exchange exchange) throws JMSException {
+        Object body = exchange.getIn().getBody();
+        if (body instanceof String) {
+            ActiveMQTextMessage answer = new ActiveMQTextMessage();
+            answer.setText((String) body);
+            return answer;
+        } else if (body instanceof Serializable) {
+            ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
+            answer.setObject((Serializable) body);
+            return answer;
+        } else {
+            return new ActiveMQMessage();
+        }
+
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public JmsBinding getBinding() {
+        return binding;
+    }
+
+    public void setBinding(JmsBinding binding) {
+        this.binding = binding;
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQMessageConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQMessageConverter.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalComponent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalComponent.java?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalComponent.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalComponent.java Fri Sep  7 08:44:45 2007
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultComponent;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * The <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
+ *
+ * @version $Revision$
+ */
+public class JournalComponent extends DefaultComponent<Exchange> {
+
+    @Override
+    protected Endpoint<Exchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception {        
+        JournalEndpoint endpoint = new JournalEndpoint(uri, this, new File(remaining));
+        setProperties(endpoint, parameters);
+        return endpoint;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalComponent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalComponent.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalEndpoint.java?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalEndpoint.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalEndpoint.java Fri Sep  7 08:44:45 2007
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class JournalEndpoint extends DefaultEndpoint<Exchange> {
+
+    private static final transient Log LOG = LogFactory.getLog(JournalEndpoint.class);
+
+    private final File directory;
+    private final AtomicReference<DefaultConsumer<Exchange>> consumer = new AtomicReference<DefaultConsumer<Exchange>>();
+    private final Object activationMutex = new Object();
+    private int referenceCount;
+    private AsyncDataManager dataManager;
+    private Thread thread;
+    private Location lastReadLocation;
+    private long idleDelay = 1000;
+    private boolean syncProduce = true;
+    private boolean syncConsume;
+
+    public JournalEndpoint(String uri, JournalComponent journalComponent, File directory) {
+        super(uri, journalComponent.getCamelContext());
+        this.directory = directory;
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public File getDirectory() {
+        return directory;
+    }
+
+    public Consumer<Exchange> createConsumer(Processor processor) throws Exception {
+        return new DefaultConsumer<Exchange>(this, processor) {
+            @Override
+            public void start() throws Exception {
+                super.start();
+                activateConsumer(this);
+            }
+
+            @Override
+            public void stop() throws Exception {
+                deactivateConsumer(this);
+                super.stop();
+            }
+        };
+    }
+
+    protected void decrementReference() throws IOException {
+        synchronized (activationMutex) {
+            referenceCount--;
+            if (referenceCount == 0) {
+                LOG.debug("Closing data manager: " + directory);
+                LOG.debug("Last mark at: " + lastReadLocation);
+                dataManager.close();
+                dataManager = null;
+            }
+        }
+    }
+
+    protected void incrementReference() throws IOException {
+        synchronized (activationMutex) {
+            referenceCount++;
+            if (referenceCount == 1) {
+                LOG.debug("Opening data manager: " + directory);
+                dataManager = new AsyncDataManager();
+                dataManager.setDirectory(directory);
+                dataManager.start();
+
+                lastReadLocation = dataManager.getMark();
+                LOG.debug("Last mark at: " + lastReadLocation);
+            }
+        }
+    }
+
+    protected void deactivateConsumer(DefaultConsumer<Exchange> consumer) throws IOException {
+        synchronized (activationMutex) {
+            if (this.consumer.get() != consumer) {
+                throw new RuntimeCamelException("Consumer was not active.");
+            }
+            this.consumer.set(null);
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                throw new InterruptedIOException();
+            }
+            decrementReference();
+        }
+    }
+
+    protected void activateConsumer(DefaultConsumer<Exchange> consumer) throws IOException {
+        synchronized (activationMutex) {
+            if (this.consumer.get() != null) {
+                throw new RuntimeCamelException("Consumer already active: journal endpoints only support 1 active consumer");
+            }
+            incrementReference();
+            this.consumer.set(consumer);
+            thread = new Thread() {
+                @Override
+                public void run() {
+                    dispatchToConsumer();
+                }
+            };
+            thread.setName("Dipatch thread: " + getEndpointUri());
+            thread.setDaemon(true);
+            thread.start();
+        }
+    }
+
+    protected void dispatchToConsumer() {
+        try {
+            DefaultConsumer<Exchange> consumer;
+            while ((consumer = this.consumer.get()) != null) {
+                // See if there is a new record to process
+                Location location = dataManager.getNextLocation(lastReadLocation);
+                if (location != null) {
+
+                    // Send it on.
+                    ByteSequence read = dataManager.read(location);
+                    Exchange exchange = createExchange();
+                    exchange.getIn().setBody(read);
+                    exchange.getIn().setHeader("journal", getEndpointUri());
+                    exchange.getIn().setHeader("location", location);
+                    consumer.getProcessor().process(exchange);
+
+                    // Setting the mark makes the data manager forget about
+                    // everything
+                    // before that record.
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Consumed record at: " + location);
+                    }
+                    dataManager.setMark(location, syncConsume);
+                    lastReadLocation = location;
+                } else {
+                    // Avoid a tight CPU loop if there is no new record to read.
+                    LOG.debug("Sleeping due to no records being available.");
+                    Thread.sleep(idleDelay);
+                }
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
+    }
+
+    public Producer<Exchange> createProducer() throws Exception {
+        return new DefaultProducer<Exchange>(this) {
+            public void process(Exchange exchange) throws Exception {
+                incrementReference();
+                try {
+
+                    ByteSequence body = exchange.getIn().getBody(ByteSequence.class);
+                    if (body == null) {
+                        byte[] bytes = exchange.getIn().getBody(byte[].class);
+                        if (bytes != null) {
+                            body = new ByteSequence(bytes);
+                        }
+                    }
+                    if (body == null) {
+                        throw new CamelExchangeException("In body message could not be converted to a ByteSequence or a byte array.", exchange);
+                    }
+                    dataManager.write(body, syncProduce);
+
+                } finally {
+                    decrementReference();
+                }
+            }
+        };
+    }
+
+    public boolean isSyncConsume() {
+        return syncConsume;
+    }
+
+    public void setSyncConsume(boolean syncConsume) {
+        this.syncConsume = syncConsume;
+    }
+
+    public boolean isSyncProduce() {
+        return syncProduce;
+    }
+
+    public void setSyncProduce(boolean syncProduce) {
+        this.syncProduce = syncProduce;
+    }
+
+    boolean isOpen() {
+        synchronized (activationMutex) {
+            return referenceCount > 0;
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/JournalEndpoint.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/package.html
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/package.html?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/package.html (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/package.html Fri Sep  7 08:44:45 2007
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Defines the <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
+
+</body>
+</html>

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/package.html
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/camel/component/package.html
------------------------------------------------------------------------------
    svn:mime-type = text/html

Added: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/TypeConverter?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/TypeConverter (added)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/TypeConverter Fri Sep  7 08:44:45 2007
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.activemq.camel.component
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq (added)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq Fri Sep  7 08:44:45 2007
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.activemq.camel.component.ActiveMQComponent

Added: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq.journal
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq.journal?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq.journal (added)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/camel/component/activemq.journal Fri Sep  7 08:44:45 2007
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.activemq.camel.component.JournalComponent

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQConfigureTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQConfigureTest.java?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQConfigureTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQConfigureTest.java Fri Sep  7 08:44:45 2007
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.activemq.spring.ActiveMQConnectionFactory;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.jms.JmsConsumer;
+import org.apache.camel.component.jms.JmsEndpoint;
+import org.apache.camel.component.jms.JmsProducer;
+import org.apache.camel.processor.Logger;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+
+/**
+ * @version $Revision$
+ */
+public class ActiveMQConfigureTest extends ContextTestSupport {
+    
+    public void testJmsTemplateUsesPoolingConnectionFactory() throws Exception {
+        JmsEndpoint endpoint = resolveMandatoryEndpoint("activemq:test.foo");
+        JmsProducer producer = endpoint.createProducer();
+
+        JmsTemplate template = assertIsInstanceOf(JmsTemplate.class, producer.getTemplate());
+        assertIsInstanceOf(PooledConnectionFactory.class, template.getConnectionFactory());
+        assertEquals("pubSubDomain", false, template.isPubSubDomain());
+    }
+
+    public void testListenerContainerUsesSpringConnectionFactory() throws Exception {
+        JmsEndpoint endpoint = resolveMandatoryEndpoint("activemq:topic:test.foo");
+        JmsConsumer consumer = endpoint.createConsumer(new Logger());
+
+        AbstractMessageListenerContainer listenerContainer = consumer.getListenerContainer();
+        assertIsInstanceOf(ActiveMQConnectionFactory.class, listenerContainer.getConnectionFactory());
+        assertEquals("pubSubDomain", true, listenerContainer.isPubSubDomain());
+
+    }
+
+    @Override
+    protected JmsEndpoint resolveMandatoryEndpoint(String uri) {
+        Endpoint endpoint = super.resolveMandatoryEndpoint(uri);
+        return assertIsInstanceOf(JmsEndpoint.class, endpoint);
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQConfigureTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQConfigureTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQJmsHeaderRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQJmsHeaderRouteTest.java?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQJmsHeaderRouteTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQJmsHeaderRouteTest.java Fri Sep  7 08:44:45 2007
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
+import org.apache.camel.component.jms.JmsExchange;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.mock.AssertionClause;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.Message;
+import javax.jms.Destination;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * @version $Revision$
+ */
+public class ActiveMQJmsHeaderRouteTest extends ContextTestSupport {
+    private static final transient Log LOG = LogFactory.getLog(ActiveMQJmsHeaderRouteTest.class);
+
+    protected Object expectedBody = "<time>" + new Date() + "</time>";
+    protected ActiveMQQueue replyQueue = new ActiveMQQueue("test.reply.queue");
+    protected String correlationID = "ABC-123";
+    protected String messageType = getClass().getName();
+
+    public void testForwardingAMessageAcrossJMSKeepingCustomJMSHeaders() throws Exception {
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+
+        resultEndpoint.expectedBodiesReceived(expectedBody);
+        AssertionClause firstMessageExpectations = resultEndpoint.message(0);
+        firstMessageExpectations.header("cheese").isEqualTo(123);
+        firstMessageExpectations.header("JMSReplyTo").isEqualTo(replyQueue);
+        firstMessageExpectations.header("JMSCorrelationID").isEqualTo(correlationID);
+        firstMessageExpectations.header("JMSType").isEqualTo(messageType);
+
+        template.sendBodyAndHeader("activemq:test.a", expectedBody, "cheese", 123);
+
+        resultEndpoint.assertIsSatisfied();
+
+        List<Exchange> list = resultEndpoint.getReceivedExchanges();
+        Exchange exchange = list.get(0);
+        Object replyTo = exchange.getIn().getHeader("JMSReplyTo");
+        LOG.info("Reply to is: " + replyTo);
+        Destination destination = assertIsInstanceOf(Destination.class, replyTo);
+        assertEquals("ReplyTo", replyQueue.toString(), destination.toString());
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        // START SNIPPET: example
+        camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
+        // END SNIPPET: example
+
+        return camelContext;
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("activemq:test.a").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        // lets set the custom JMS headers using the JMS API
+                        JmsExchange jmsExchange = assertIsInstanceOf(JmsExchange.class, exchange);
+                        Message inMessage = jmsExchange.getInMessage();
+                        inMessage.setJMSReplyTo(replyQueue);
+                        inMessage.setJMSCorrelationID(correlationID);
+                        inMessage.setJMSType(messageType);
+                    }
+                }).to("activemq:test.b");
+
+                from("activemq:test.b").to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQJmsHeaderRouteTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQJmsHeaderRouteTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQReplyToHeaderUsingConverterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQReplyToHeaderUsingConverterTest.java?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQReplyToHeaderUsingConverterTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQReplyToHeaderUsingConverterTest.java Fri Sep  7 08:44:45 2007
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
+import org.apache.camel.component.mock.AssertionClause;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.Destination;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @version $Revision$
+ */
+public class ActiveMQReplyToHeaderUsingConverterTest extends ContextTestSupport {
+    private static final transient Log LOG = LogFactory.getLog(ActiveMQReplyToHeaderUsingConverterTest.class);
+
+    protected Object expectedBody = "<time>" + new Date() + "</time>";
+    protected String replyQueueName = "queue://test.my.reply.queue";
+    protected String correlationID = "ABC-123";
+    protected String groupID = "GROUP-XYZ";
+    protected String messageType = getClass().getName();
+
+    public void testSendingAMessageFromCamelSetsCustomJmsHeaders() throws Exception {
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+
+        resultEndpoint.expectedBodiesReceived(expectedBody);
+        AssertionClause firstMessage = resultEndpoint.message(0);
+        firstMessage.header("cheese").isEqualTo(123);
+        firstMessage.header("JMSCorrelationID").isEqualTo(correlationID);
+        firstMessage.header("JMSReplyTo").isEqualTo(ActiveMQConverter.toDestination(replyQueueName));
+        firstMessage.header("JMSType").isEqualTo(messageType);
+        firstMessage.header("JMSXGroupID").isEqualTo(groupID);
+
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put("cheese", 123);
+        headers.put("JMSReplyTo", replyQueueName);
+        headers.put("JMSCorrelationID", correlationID);
+        headers.put("JMSType", messageType);
+        headers.put("JMSXGroupID", groupID);
+        template.sendBodyAndHeaders("activemq:test.a", expectedBody, headers);
+
+        resultEndpoint.assertIsSatisfied();
+
+        List<Exchange> list = resultEndpoint.getReceivedExchanges();
+        Exchange exchange = list.get(0);
+        Message in = exchange.getIn();
+        Object replyTo = in.getHeader("JMSReplyTo");
+        LOG.info("Reply to is: " + replyTo);
+        Destination destination = assertIsInstanceOf(Destination.class, replyTo);
+        assertEquals("ReplyTo", replyQueueName, destination.toString());
+
+        assertMessageHeader(in, "cheese", 123);
+        assertMessageHeader(in, "JMSCorrelationID", correlationID);
+        assertMessageHeader(in, "JMSType", messageType);
+        assertMessageHeader(in, "JMSXGroupID", groupID);
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        // START SNIPPET: example
+        camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
+        // END SNIPPET: example
+
+        return camelContext;
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("activemq:test.a").to("activemq:test.b");
+
+                from("activemq:test.b").to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQReplyToHeaderUsingConverterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQReplyToHeaderUsingConverterTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java Fri Sep  7 08:44:45 2007
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
+import org.apache.camel.component.jms.JmsEndpoint;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class ActiveMQRouteTest extends ContextTestSupport {
+    protected MockEndpoint resultEndpoint;
+    protected String startEndpointUri = "activemq:queue:test.a";
+
+    public void testJmsRouteWithTextMessage() throws Exception {
+        String expectedBody = "Hello there!";
+
+        resultEndpoint.expectedBodiesReceived(expectedBody);
+        resultEndpoint.message(0).header("cheese").isEqualTo(123);
+
+        sendExchange(expectedBody);
+
+        resultEndpoint.assertIsSatisfied();
+    }
+
+    protected void sendExchange(final Object expectedBody) {
+        template.sendBodyAndHeader(startEndpointUri, expectedBody, "cheese", 123);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        resultEndpoint = (MockEndpoint) context.getEndpoint("mock:result");
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        // START SNIPPET: example
+        camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
+        // END SNIPPET: example
+
+        return camelContext;
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from(startEndpointUri).to("activemq:queue:test.b");
+                from("activemq:queue:test.b").to("mock:result");
+
+                JmsEndpoint endpoint1 = (JmsEndpoint) endpoint("activemq:topic:quote.IONA");
+                endpoint1.getConfiguration().setTransacted(true);
+                from(endpoint1).to("mock:transactedClient");
+
+                JmsEndpoint endpoint2 = (JmsEndpoint) endpoint("activemq:topic:quote.IONA");
+                endpoint1.getConfiguration().setTransacted(true);
+                from(endpoint2).to("mock:nonTrasnactedClient");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalConfigureTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalConfigureTest.java?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalConfigureTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalConfigureTest.java Fri Sep  7 08:44:45 2007
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import java.io.File;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+
+/**
+ * @version $Revision$
+ */
+public class JournalConfigureTest extends ContextTestSupport {
+
+    public void testDefaltConfig() throws Exception {
+        JournalEndpoint endpoint = resolveMandatoryEndpoint("activemq.journal:target/test");
+        assertEquals("directory", new File("target", "test"), endpoint.getDirectory());
+        assertEquals("syncConsume", false, endpoint.isSyncConsume());
+        assertEquals("syncProduce", true, endpoint.isSyncProduce());
+    }
+
+    public void testConfigViaOptions() throws Exception {
+        JournalEndpoint endpoint = resolveMandatoryEndpoint("activemq.journal:target/test?syncConsume=true&syncProduce=false");
+        assertEquals("directory", new File("target", "test"), endpoint.getDirectory());
+        assertEquals("syncConsume", true, endpoint.isSyncConsume());
+        assertEquals("syncProduce", false, endpoint.isSyncProduce());
+    }
+
+    @Override
+    protected JournalEndpoint resolveMandatoryEndpoint(String uri) {
+        Endpoint endpoint = super.resolveMandatoryEndpoint(uri);
+        return assertIsInstanceOf(JournalEndpoint.class, endpoint);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalConfigureTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalConfigureTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRoutePerformance.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRoutePerformance.java?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRoutePerformance.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRoutePerformance.java Fri Sep  7 08:44:45 2007
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * Used to get an idea of what kind of performance can be expected from 
+ * the journal.
+ * 
+ * @version $Revision$
+ */
+public class JournalRoutePerformance extends ContextTestSupport {
+
+    AtomicLong produceCounter = new AtomicLong();
+    AtomicLong consumeCounter = new AtomicLong();
+    AtomicBoolean running = new AtomicBoolean(true);
+
+    public void testPerformance() throws Exception {
+
+        int payLoadSize = 1024;
+        int concurrentProducers = 50;
+        long delayBetweenSample = 1000;
+        long perfTestDuration = 1000 * 60; // 1 min
+
+        StringBuffer t = new StringBuffer();
+        for (int i = 0; i < payLoadSize; i++) {
+            t.append('a' + (i % 26));
+        }
+        final byte[] payload = t.toString().getBytes("UTF-8");
+
+        for (int i = 0; i < concurrentProducers; i++) {
+            Thread thread = new Thread("Producer: " + i) {
+                @Override
+                public void run() {
+                    while (running.get()) {
+                        template.sendBody("direct:in", payload);
+                        produceCounter.incrementAndGet();
+                    }
+                }
+            };
+            thread.start();
+        }
+
+        long produceTotal = 0;
+        long consumeTotal = 0;
+        long start = System.currentTimeMillis();
+        long end = start + perfTestDuration;
+        while (System.currentTimeMillis() < end) {
+            Thread.sleep(delayBetweenSample);
+            long totalTime = System.currentTimeMillis() - start;
+            long p = produceCounter.getAndSet(0);
+            long c = consumeCounter.getAndSet(0);
+            produceTotal += p;
+            consumeTotal += c;
+            System.out.println("Interval Produced " + stat(p, delayBetweenSample) + " m/s, Consumed " + stat(c, delayBetweenSample) + " m/s");
+            System.out.println("Total Produced " + stat(produceTotal, totalTime) + " m/s, Consumed " + stat(consumeTotal, totalTime) + " m/s");
+        }
+        running.set(false);
+
+    }
+
+    private String stat(long pd, long delayBetweenSample) {
+        return "" + (1.0 * pd / delayBetweenSample) * 1000.0;
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:in").to("activemq.journal:target/perf-test");
+                from("activemq.journal:target/perf-test").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        consumeCounter.incrementAndGet();
+                    }
+                });
+            }
+        };
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRoutePerformance.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRoutePerformance.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRouteTest.java?rev=573615&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRouteTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRouteTest.java Fri Sep  7 08:44:45 2007
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel.component;
+
+import java.util.List;
+
+import org.apache.activemq.util.ByteSequence;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.AssertionClause;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class JournalRouteTest extends ContextTestSupport {
+
+    public void testSimpleJournalRoute() throws Exception {
+
+        byte[] payload = "Hello World".getBytes();
+        
+        
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:out", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+        
+        AssertionClause firstMessageExpectations = resultEndpoint.message(0);
+        firstMessageExpectations.header("journal").isEqualTo("activemq.journal:target/test.a");
+        firstMessageExpectations.header("location").isNotNull();
+        firstMessageExpectations.body().isInstanceOf(ByteSequence.class);
+
+        template.sendBody("direct:in", payload);
+
+        resultEndpoint.assertIsSatisfied();
+
+        List<Exchange> list = resultEndpoint.getReceivedExchanges();
+        Exchange exchange = list.get(0);
+        ByteSequence body = (ByteSequence)exchange.getIn().getBody();
+        body.compact(); // trims the byte array to the actual size.
+        assertEquals("body", new String(payload), new String(body.data));
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:in").to("activemq.journal:target/test.a");
+                from("activemq.journal:target/test.a").to("mock:out");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRouteTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/camel/component/JournalRouteTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/assembly/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/pom.xml?rev=573615&r1=573614&r2=573615&view=diff
==============================================================================
--- activemq/trunk/assembly/pom.xml (original)
+++ activemq/trunk/assembly/pom.xml Fri Sep  7 08:44:45 2007
@@ -116,10 +116,6 @@
       <groupId>org.apache.camel</groupId>
       <artifactId>camel-jms</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.apache.camel</groupId>
-      <artifactId>camel-activemq</artifactId>
-    </dependency>
     <!-- for the XML parsing -->
     <dependency>
       <groupId>javax.xml</groupId>

Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=573615&r1=573614&r2=573615&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Fri Sep  7 08:44:45 2007
@@ -400,21 +400,8 @@
         <artifactId>camel-jms</artifactId>
         <version>${camel-version}</version>
       </dependency>
-      <dependency>
-        <groupId>org.apache.camel</groupId>
-        <artifactId>camel-activemq</artifactId>
-        <version>${camel-version}</version>
 
-        <!-- lets swap out the version of AMQ that camel released with -->
-        <exclusions>
-          <exclusion>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-core</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-
-			<!-- for the XML parsing -->
+      <!-- for the XML parsing -->
       <dependency>
         <groupId>javax.xml</groupId>
         <artifactId>jaxb-api</artifactId>



Mime
View raw message