Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 46323 invoked from network); 20 Mar 2007 05:44:47 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Mar 2007 05:44:46 -0000 Received: (qmail 96006 invoked by uid 500); 20 Mar 2007 05:44:54 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 95989 invoked by uid 500); 20 Mar 2007 05:44:54 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 95980 invoked by uid 99); 20 Mar 2007 05:44:54 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Mar 2007 22:44:54 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Mar 2007 22:44:45 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 8FAF41A983E; Mon, 19 Mar 2007 22:44:25 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r520287 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/queue/ camel-core/src/test/java/org/apache/camel/queue/ camel-jms/src/m... Date: Tue, 20 Mar 2007 05:44:25 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070320054425.8FAF41A983E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Mon Mar 19 22:44:24 2007 New Revision: 520287 URL: http://svn.apache.org/viewvc?view=rev&rev=520287 Log: Got rid the the activate/deactivate methods on Component since they look better on Endpoint.. but I did add similar methods to the Container. Also repliated the JmsRouteTest as a QueueRouteTest Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java?view=diff&rev=520287&r1=520286&r2=520287 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java Mon Mar 19 22:44:24 2007 @@ -17,15 +17,15 @@ */ package org.apache.camel; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.impl.DefaultEndpointResolver; -import org.apache.camel.impl.DefaultExchangeConverter; - import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultEndpointResolver; +import org.apache.camel.impl.DefaultExchangeConverter; + /** * Represents the container used to configure routes and the policies to use. * @@ -37,19 +37,34 @@ private EndpointResolver endpointResolver; private ExchangeConverter exchangeConverter; private Map components = new HashMap(); + private Map, Processor> routes; + + /** + * Activates all the starting endpoints in that were added as routes. + */ + public void activateEndpoints() { + for (Map.Entry, Processor> entry : routes.entrySet()) { + Endpoint endpoint = entry.getKey(); + Processor processor = entry.getValue(); + endpoint.activate(processor); + } + } + + /** + * Deactivates all the starting endpoints in that were added as routes. + */ + public void deactivateEndpoints() { + for (Endpoint endpoint : routes.keySet()) { + endpoint.deactivate(); + } + } // Builder APIs //----------------------------------------------------------------------- public void routes(RouteBuilder builder) { // lets now add the routes from the builder builder.setContainer(this); - Map, Processor> routeMap = builder.getRouteMap(); - Set, Processor>> entries = routeMap.entrySet(); - for (Map.Entry, Processor> entry : entries) { - Endpoint endpoint = entry.getKey(); - Processor processor = entry.getValue(); - endpoint.setInboundProcessor(processor); - } + routes = builder.getRouteMap(); } public void routes(final RouteFactory factory) { @@ -64,18 +79,18 @@ /** * Adds a component to the container if there is not currently a component already registered. */ - public void addComponent(String componentName, final Component> component) { + public void addComponent(String componentName, final Component component) { // TODO provide a version of this which barfs if the component is registered multiple times - getOrCreateComponent(componentName, new Callable>>() { - public Component> call() throws Exception { + getOrCreateComponent(componentName, new Callable>() { + public Component call() throws Exception { return component; } }); } - /** + /**O * Resolves the given URI to an endpoint */ public Endpoint endpoint(String uri) { @@ -121,7 +136,7 @@ return new DefaultExchangeConverter(); } - public Component getOrCreateComponent(String componentName, Callable>> factory) { + public Component getOrCreateComponent(String componentName, Callable> factory) { synchronized (components) { Component component = components.get(componentName); if (component == null) { Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java?view=diff&rev=520287&r1=520286&r2=520287 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java Mon Mar 19 22:44:24 2007 @@ -21,23 +21,12 @@ * * @version $Revision: 519901 $ */ -public interface Component> { +public interface Component { /** * The CamelContainer is injected into the component when it is added to it */ void setContainer(CamelContainer container); - /** - * Asks the component to activate the delivery of {@link Exchange} objects - * from the {@link Endpoint} to the {@link Processor}. - */ - void activate(EP endpoint, Processor processor); - - /** - * Stops the delivery of messages from a previously activated - * {@link Endpoint}. - */ - void deactivate(EP endpoint); } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java?view=diff&rev=520287&r1=520286&r2=520287 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Mon Mar 19 22:44:24 2007 @@ -16,6 +16,7 @@ */ package org.apache.camel; + /** * Represents an endpoint on which messages can be exchanged * @@ -32,11 +33,6 @@ * Sends the message exchange to this endpoint */ void send(E exchange); - - /** - * Sets the processor for inbound messages - */ - void setInboundProcessor(Processor processor); /** * Create a new exchange for communicating with this endpoint @@ -45,9 +41,13 @@ /** - * Called by the container when an endpoint is activiated + * Called by the container to Activate the endpoint. Once activated, + * the endpoint will start delivering messages inbound exchanges + * it receives to the specified processor. + * + * @throws IllegalStateException is the Endpoint has already been activated. */ - void activate(); + public void activate(Processor processor) throws IllegalStateException; /** * Called by the container when the endpoint is deactivated Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?view=diff&rev=520287&r1=520286&r2=520287 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Mon Mar 19 22:44:24 2007 @@ -77,9 +77,12 @@ } - public void activate() { + public void activate(Processor inboundProcessor) { if (activated.compareAndSet(false, true)) { + this.inboundProcessor = inboundProcessor; doActivate(); + } else { + throw new IllegalStateException("Endpoint is already active: "+getEndpointUri()); } } public void deactivate() { @@ -97,7 +100,6 @@ public void setInboundProcessor(Processor inboundProcessor) { this.inboundProcessor = inboundProcessor; - activate(); } /** Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java?view=diff&rev=520287&r1=520286&r2=520287 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java Mon Mar 19 22:44:24 2007 @@ -42,7 +42,6 @@ return resolver.resolveEndpoint(container, uri); } - public Component resolveComponent(CamelContainer container, String uri) { EndpointResolver resolver = getDelegate(uri); return resolver.resolveComponent(container, uri); Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java?view=diff&rev=520287&r1=520286&r2=520287 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java Mon Mar 19 22:44:24 2007 @@ -16,14 +16,12 @@ */ package org.apache.camel.queue; -import org.apache.camel.CamelContainer; -import org.apache.camel.Component; -import org.apache.camel.Processor; - import java.util.HashMap; -import java.util.Queue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.camel.CamelContainer; +import org.apache.camel.Component; /** * Represents the component that manages {@link QueueEndpoint}. It holds the @@ -32,45 +30,17 @@ * @org.apache.xbean.XBean * @version $Revision: 519973 $ */ -public class QueueComponent implements Component> { +public class QueueComponent implements Component { - private HashMap> registry = new HashMap>(); - private HashMap, Activation> activations = new HashMap, Activation>(); + private HashMap> registry = new HashMap>(); private CamelContainer container; public void setContainer(CamelContainer container) { this.container = container; } - class Activation implements Runnable { - private final QueueEndpoint endpoint; - AtomicBoolean stop = new AtomicBoolean(); - private Thread thread; - - public Activation(QueueEndpoint endpoint) { - this.endpoint = endpoint; - } - - public void run() { - while(!stop.get()) { - - } - } - - public void start() { - thread = new Thread(this, endpoint.getEndpointUri()); - thread.setDaemon(true); - thread.start(); - } - - public void stop() throws InterruptedException { - stop.set(true); - thread.join(); - } - } - - synchronized public Queue getOrCreateQueue(String uri) { - Queue queue = registry.get(uri); + synchronized public BlockingQueue getOrCreateQueue(String uri) { + BlockingQueue queue = registry.get(uri); if( queue == null ) { queue = createQueue(); registry.put(uri, queue); @@ -78,30 +48,13 @@ return queue; } - private Queue createQueue() { + protected BlockingQueue createQueue() { return new LinkedBlockingQueue(); } - public void activate(QueueEndpoint endpoint, Processor processor) { - Activation activation = activations.get(endpoint); - if( activation!=null ) { - throw new IllegalArgumentException("Endpoint "+endpoint.getEndpointUri()+" has already been activated."); - } - - activation = new Activation(endpoint); - activation.start(); + public CamelContainer getContainer() { + return container; } - public void deactivate(QueueEndpoint endpoint) { - Activation activation = activations.remove(endpoint); - if( activation==null ) { - throw new IllegalArgumentException("Endpoint "+endpoint.getEndpointUri()+" is not activate."); - } - try { - activation.stop(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java?view=diff&rev=520287&r1=520286&r2=520287 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java Mon Mar 19 22:44:24 2007 @@ -16,24 +16,28 @@ */ package org.apache.camel.queue; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.camel.CamelContainer; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.DefaultExchange; -import java.util.Queue; - /** - * Represents a queue endpoint that uses a {@link Queue} + * Represents a queue endpoint that uses a {@link BlockingQueue} * object to process inbound exchanges. * * @org.apache.xbean.XBean * @version $Revision: 519973 $ */ public class QueueEndpoint extends DefaultEndpoint { - private Queue queue; + private BlockingQueue queue; + private org.apache.camel.queue.QueueEndpoint.Activation activation; - public QueueEndpoint(String uri, CamelContainer container, Queue queue) { + public QueueEndpoint(String uri, CamelContainer container, BlockingQueue queue) { super(uri, container); this.queue = queue; } @@ -55,5 +59,60 @@ public Queue getQueue() { return queue; + } + + class Activation implements Runnable { + AtomicBoolean stop = new AtomicBoolean(); + private Thread thread; + + public void run() { + while(!stop.get()) { + E exchange=null; + try { + exchange = queue.poll(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + break; + } + if( exchange !=null ) { + try { + getInboundProcessor().onExchange(exchange); + } catch (Throwable e) { + e.printStackTrace(); + } + } + } + } + + public void start() { + thread = new Thread(this, getEndpointUri()); + thread.setDaemon(true); + thread.start(); + } + + public void stop() throws InterruptedException { + stop.set(true); + thread.join(); + } + + @Override + public String toString() { + return "Activation: "+getEndpointUri(); + } + } + + @Override + protected void doActivate() { + activation = new Activation(); + activation.start(); + } + + @Override + protected void doDeactivate() { + try { + activation.stop(); + activation=null; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java?view=diff&rev=520287&r1=520286&r2=520287 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java Mon Mar 19 22:44:24 2007 @@ -16,7 +16,7 @@ */ package org.apache.camel.queue; -import java.util.Queue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import org.apache.camel.CamelContainer; @@ -61,7 +61,7 @@ public Endpoint resolveEndpoint(CamelContainer container, String uri) { String id[] = getEndpointId(uri); QueueComponent component = resolveQueueComponent(container, id[0]); - Queue queue = component.getOrCreateQueue(id[1]); + BlockingQueue queue = component.getOrCreateQueue(id[1]); return new QueueEndpoint(uri, container, queue); } @@ -82,8 +82,8 @@ @SuppressWarnings("unchecked") private QueueComponent resolveQueueComponent(CamelContainer container, String componentName) { - Component rc = container.getOrCreateComponent(componentName, new Callable>>(){ - public Component> call() throws Exception { + Component rc = container.getOrCreateComponent(componentName, new Callable>(){ + public Component call() throws Exception { return new QueueComponent(); }}); return (QueueComponent) rc; Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java?view=auto&rev=520287 ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java (added) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java Mon Mar 19 22:44:24 2007 @@ -0,0 +1,72 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.queue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + +import org.apache.camel.CamelContainer; +import org.apache.camel.Endpoint; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultExchange; + +/** + * @version $Revision: 520220 $ + */ +public class QueueRouteTest extends TestCase { + + static class StringExchange extends DefaultExchange { + } + + public void testJmsRoute() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + + CamelContainer container = new CamelContainer(); + + // lets add some routes + container.routes(new RouteBuilder() { + public void configure() { + from("queue:test.a").to("queue:test.b"); + from("queue:test.b").process(new Processor() { + public void onExchange(StringExchange exchange) { + System.out.println("Received exchange: " + exchange.getRequest()); + latch.countDown(); + } + }); + } + }); + + + container.activateEndpoints(); + + // now lets fire in a message + Endpoint endpoint = container.endpoint("queue:test.a"); + StringExchange exchange = new StringExchange(); + exchange.setHeader("cheese", 123); + endpoint.send(exchange); + + // now lets sleep for a while + boolean received = latch.await(5, TimeUnit.SECONDS); + assertTrue("Did not recieve the message!", received); + + container.deactivateEndpoints(); + } +} Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java?view=diff&rev=520287&r1=520286&r2=520287 ============================================================================== --- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java (original) +++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java Mon Mar 19 22:44:24 2007 @@ -32,7 +32,7 @@ /** * @version $Revision$ */ -public class JmsComponent implements Component { +public class JmsComponent implements Component { public static final String QUEUE_PREFIX = "queue/"; public static final String TOPIC_PREFIX = "topic/"; Modified: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java?view=diff&rev=520287&r1=520286&r2=520287 ============================================================================== --- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java (original) +++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java Mon Mar 19 22:44:24 2007 @@ -55,6 +55,9 @@ } }); + + container.activateEndpoints(); + // now lets fire in a message Endpoint endpoint = container.endpoint("jms:activemq:test.a"); JmsExchange exchange2 = endpoint.createExchange(); @@ -66,7 +69,6 @@ boolean received = latch.await(5, TimeUnit.SECONDS); assertTrue("Did not recieve the message!", received); - // TODO - //container.stop(); + container.deactivateEndpoints(); } }