activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r523047 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/component/pojo/ test/java/org/apache/camel/component/pojo/
Date Tue, 27 Mar 2007 19:29:29 GMT
Author: chirino
Date: Tue Mar 27 12:29:28 2007
New Revision: 523047

URL: http://svn.apache.org/viewvc?view=rev&rev=523047
Log:
Updated the Pojo component to take advantage of the consumer/producer stuff.
I think the testcase is now more clear of how things work.


Removed:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PollingPojoEndpoint.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpointResolver.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PollingPojoComponent.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java?view=diff&rev=523047&r1=523046&r2=523047
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java
Tue Mar 27 12:29:28 2007
@@ -28,39 +28,36 @@
  * @version $Revision: 519973 $
  */
 public class PojoComponent implements Component<PojoExchange> {
-    protected final HashMap<String, Object> registry = new HashMap<String, Object>();
-    protected final HashMap<String, PojoEndpoint> activatedEndpoints = new HashMap<String,
PojoEndpoint>();
+	
+    protected final HashMap<String, Object> services = new HashMap<String, Object>();
+    protected final HashMap<String, PojoConsumer> consumers = new HashMap<String,
PojoConsumer>();
+    
     private CamelContext container;
 
-    public void registerPojo(String uri, Object pojo) {
-        registry.put(uri, pojo);
+    public void addService(String uri, Object pojo) {
+        services.put(uri, pojo);
     }
-
-    public Object lookupRegisteredPojo(String uri) {
-        return registry.get(uri);
+    public void removeService(String uri) {
+        services.remove(uri);
+        removeConsumer(uri);
     }
-
-    public void unregisterPojo(String uri) {
-        registry.remove(uri);
-        unregisterActivation(uri);
+    public Object getService(String uri) {
+        return services.get(uri);
     }
 
-    public void registerActivation(String uri, PojoEndpoint endpoint) {
-        activatedEndpoints.put(uri, endpoint);
+    void addConsumer(String uri, PojoConsumer endpoint) {
+        consumers.put(uri, endpoint);
     }
-
-    public void unregisterActivation(String uri) {
-        activatedEndpoints.remove(uri);
+    void removeConsumer(String uri) {
+        consumers.remove(uri);
     }
-
-    public PojoEndpoint lookupActivation(String uri) {
-        return activatedEndpoints.get(uri);
+    public PojoConsumer getConsumer(String uri) {
+        return consumers.get(uri);
     }
 
     public void setCamelContext(CamelContext container) {
         this.container = container;
     }
-
     public CamelContext getContainer() {
         return container;
     }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java?view=diff&rev=523047&r1=523046&r2=523047
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoConsumer.java
Tue Mar 27 12:29:28 2007
@@ -17,49 +17,86 @@
  */
 package org.apache.camel.component.pojo;
 
-import org.apache.camel.impl.DefaultConsumer;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Processor;
-
-import java.lang.reflect.Proxy;
 import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.impl.DefaultConsumer;
 
 /**
  * @version $Revision$
  */
-public class PojoConsumer extends DefaultConsumer<PojoExchange> {
-    private final Object pojo;
+public class PojoConsumer extends DefaultConsumer<PojoExchange> implements InvocationHandler
{
+
+    private final PojoEndpoint endpoint;
 
-    public PojoConsumer(Endpoint<PojoExchange> endpoint, Processor<PojoExchange>
processor, Object pojo) {
+	public PojoConsumer(PojoEndpoint endpoint, Processor<PojoExchange> processor) {
         super(endpoint, processor);
-        this.pojo = pojo;
+		this.endpoint = endpoint;
+    }
+    
+    @Override
+    protected void doStart() throws Exception {
+    	PojoComponent component = endpoint.getComponent();
+    	PojoConsumer consumer = component.getConsumer(endpoint.getPojoId());
+    	if( consumer != null ) {
+    		throw new RuntimeCamelException("There is a consumer already registered for endpoint:
"+endpoint.getEndpointUri());
+    	}
+    	component.addConsumer(endpoint.getPojoId(), this);    	
     }
 
-
+    @Override
+    protected void doStop() throws Exception {
+    	PojoComponent component = endpoint.getComponent();
+    	component.removeConsumer(endpoint.getPojoId());    	
+    }
+    
+    /**
+     * Creates a Proxy which generates inbound exchanges on the consumer.
+     */
+    public Object createProxy(ClassLoader cl, Class interfaces[]) {
+        return Proxy.newProxyInstance(cl, interfaces, this);
+    }
+    /**
+     * Creates a Proxy which generates inbound exchanges on the consumer.
+     */
+    public Object createProxy(Class interfaces[]) {
+    	if( interfaces.length < 1 ) {
+    		throw new IllegalArgumentException("You must provide at least 1 interface class.");
+    	}
+        return createProxy(interfaces[0].getClassLoader(), interfaces);
+    }    
+    /**
+     * Creates a Proxy which generates inbound exchanges on the consumer.
+     */
+    @SuppressWarnings("unchecked")
+	public <T> T createProxy(ClassLoader cl, Class<T> interfaceClass) {
+        return (T) createProxy(cl, new Class[]{interfaceClass});
+    }
     /**
-     * Creates a Proxy object that can be used to deliver inbound PojoExchanges.
-     *
-     * @param interfaces
-     * @return
+     * Creates a Proxy which generates inbound exchanges on the consumer.
      */
-    public Object createInboundProxy(Class interfaces[]) {
-        return Proxy.newProxyInstance(pojo.getClass().getClassLoader(), interfaces, new InvocationHandler()
{
-            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
-                if (!isStarted()) {
-                    throw new IllegalStateException("The endpoint is not active: " + getEndpoint().getEndpointUri());
-                }
-                PojoInvocation invocation = new PojoInvocation(proxy, method, args);
-                PojoExchange exchange = getEndpoint().createExchange();
-                exchange.setInvocation(invocation);
-                getProcessor().onExchange(exchange);
-                Throwable fault = exchange.getException();
-                if (fault != null) {
-                    throw new InvocationTargetException(fault);
-                }
-                return exchange.getOut().getBody();
-            }
-        });
+    @SuppressWarnings("unchecked")
+	public <T> T createProxy(Class<T> interfaceClass) {
+        return (T) createProxy(new Class[]{interfaceClass});
     }
+
+
+	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+        if (!isStarted()) {
+            throw new IllegalStateException("The endpoint is not active: " + getEndpoint().getEndpointUri());
+        }
+        PojoInvocation invocation = new PojoInvocation(proxy, method, args);
+        PojoExchange exchange = getEndpoint().createExchange();
+        exchange.setInvocation(invocation);
+        getProcessor().onExchange(exchange);
+        Throwable fault = exchange.getException();
+        if (fault != null) {
+            throw new InvocationTargetException(fault);
+        }
+        return exchange.getOut().getBody();
+	}
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java?view=diff&rev=523047&r1=523046&r2=523047
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java
Tue Mar 27 12:29:28 2007
@@ -16,18 +16,15 @@
  */
 package org.apache.camel.component.pojo;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.Producer;
+import java.lang.reflect.InvocationTargetException;
+
 import org.apache.camel.Consumer;
+import org.apache.camel.NoSuchEndpointException;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultProducer;
 
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-
 /**
  * Represents a pojo endpoint that uses reflection
  * to send messages around.
@@ -35,31 +32,37 @@
  * @version $Revision: 519973 $
  */
 public class PojoEndpoint extends DefaultEndpoint<PojoExchange> {
-    private final Object pojo;
     private final PojoComponent component;
+	private final String pojoId;
 
-    public PojoEndpoint(String uri, CamelContext container, PojoComponent component, Object
pojo) {
-        super(uri, container);
+    public PojoEndpoint(String uri, String pojoId, PojoComponent component) {
+        super(uri, component.getContainer());
+		this.pojoId = pojoId;
         this.component = component;
-        this.pojo = pojo;
     }
 
     public Producer<PojoExchange> createProducer() throws Exception {
+        final Object pojo = component.getService(pojoId);
+        if( pojo == null )
+        	throw new NoSuchEndpointException(getEndpointUri());
+        
         return startService(new DefaultProducer<PojoExchange>(this) {
             public void onExchange(PojoExchange exchange) {
-                invoke(exchange);
+                invoke(pojo, exchange);
             }
         });
     }
 
-    public Consumer<PojoExchange> createConsumer(Processor<PojoExchange> processor)
throws Exception {
-        return startService(new PojoConsumer(this, processor, pojo));
+    public Consumer<PojoExchange> createConsumer(Processor<PojoExchange> processor)
throws Exception {    	
+    	PojoConsumer consumer = new PojoConsumer(this, processor);
+        return startService(consumer);
     }
 
     /**
      * This causes us to invoke the endpoint Pojo using reflection.
+     * @param pojo 
      */
-    public void invoke(PojoExchange exchange) {
+    public void invoke(Object pojo, PojoExchange exchange) {
         PojoInvocation invocation = exchange.getInvocation();
         try {
             Object response = invocation.getMethod().invoke(pojo, invocation.getArgs());
@@ -80,14 +83,12 @@
         return new PojoExchange(getContext());
     }
 
-    @Override
-    protected void doActivate() {
-        component.registerActivation(getEndpointUri(), this);
-    }
-
-    @Override
-    protected void doDeactivate() {
-        component.unregisterActivation(getEndpointUri());
-    }
+	public PojoComponent getComponent() {
+		return component;
+	}
+
+	public String getPojoId() {
+		return pojoId;
+	}
 
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpointResolver.java?view=diff&rev=523047&r1=523046&r2=523047
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpointResolver.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpointResolver.java
Tue Mar 27 12:29:28 2007
@@ -53,8 +53,8 @@
 	public PojoEndpoint resolveEndpoint(CamelContext container, String uri) {
 		String id[] = getEndpointId(uri);        
     	PojoComponent component = resolvePojoComponent(container, id[0]);
-        Object pojo = component.lookupRegisteredPojo(id[1]);
-		return new PojoEndpoint(uri, container, component, pojo);
+        Object pojo = component.getService(id[1]);
+		return new PojoEndpoint(uri, id[1], component);
     }
 
 	private PojoComponent resolvePojoComponent(CamelContext container, String componentName)
{

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PollingPojoComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PollingPojoComponent.java?view=diff&rev=523047&r1=523046&r2=523047
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PollingPojoComponent.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PollingPojoComponent.java
Tue Mar 27 12:29:28 2007
@@ -38,7 +38,7 @@
      *             terminated.
      */
     public void registerAndSchedulePojo(String uri,Object pojo,PollingSchedule schedule){
-        super.registerPojo(uri,pojo);
+        super.addService(uri,pojo);
         schedules.put(uri,schedule);
     }
 
@@ -54,7 +54,7 @@
      *             terminated.
      */
     public void registerAndSchedulePojo(String uri,Object pojo,Date time){
-        super.registerPojo(uri,pojo);
+        super.addService(uri,pojo);
         PollingSchedule schedule=new PollingSchedule(uri,time);
         schedules.put(uri,schedule);
     }
@@ -72,7 +72,7 @@
      *             terminated.
      */
     public void registerAndScheulePojo(String uri,Object pojo,Date firstTime,long period){
-        super.registerPojo(uri,pojo);
+        super.addService(uri,pojo);
         PollingSchedule schedule=new PollingSchedule(uri,firstTime,period);
         schedules.put(uri,schedule);
     }
@@ -106,7 +106,7 @@
      *             terminated.
      */
     public void registerAndScheulePojo(String uri,Object pojo,long delay,long period){
-        super.registerPojo(uri,pojo);
+        super.addService(uri,pojo);
         PollingSchedule schedule=new PollingSchedule(uri,delay,period);
         schedules.put(uri,schedule);
     }
@@ -140,7 +140,7 @@
      *             terminated.
      */
     public void registerAndScheulePojoAtFixedRate(String uri,Object pojo,Date firstTime,long
period){
-        super.registerPojo(uri,pojo);
+        super.addService(uri,pojo);
         PollingSchedule schedule=new PollingSchedule(uri,firstTime,-1,period,true);
         schedules.put(uri,schedule);
     }
@@ -175,14 +175,15 @@
      *             terminated.
      */
     public void registerAndScheulePojoAtFixedRate(String uri,Object pojo,long delay,long
period){
-        super.registerPojo(uri,pojo);
+        super.addService(uri,pojo);
         PollingSchedule schedule=new PollingSchedule(uri,null,delay,period,true);
         schedules.put(uri,schedule);
     }
 
     public void registerActivation(String uri,PojoEndpoint endpoint){
-        super.registerActivation(uri,endpoint);
-        if(endpoint instanceof PollingPojoEndpoint){
+//        super.registerActivation(uri,endpoint);
+//        if(endpoint instanceof PollingPojoEndpoint){
+        if(endpoint instanceof PojoEndpoint){
             PollingSchedule schedule=schedules.get(uri);
             if(schedule!=null){
                 schedule.activate((Runnable)endpoint);
@@ -190,8 +191,8 @@
         }
     }
 
-    public void unregisterActivation(String uri){
-        super.unregisterActivation(uri);
+    public void removeConsumer(String uri){
+        super.removeConsumer(uri);
         PollingSchedule schedule=schedules.remove(uri);
         if(schedule!=null){
             schedule.deactivate();

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java?view=diff&rev=523047&r1=523046&r2=523047
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/pojo/PojoRouteTest.java
Tue Mar 27 12:29:28 2007
@@ -17,15 +17,15 @@
  */
 package org.apache.camel.component.pojo;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import junit.framework.TestCase;
+
 import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.processor.InterceptorProcessor;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * @version $Revision: 520220 $
  */
@@ -36,8 +36,7 @@
         CamelContext container = new DefaultCamelContext();
         
         PojoComponent component = new PojoComponent();
-        component.registerPojo("hello", new SayService("Hello!"));
-        component.registerPojo("bye", new SayService("Good Bye!"));
+        component.addService("bye", new SayService("Good Bye!"));
         container.addComponent("default", component);
         
         final AtomicInteger hitCount = new AtomicInteger();
@@ -52,34 +51,19 @@
         container.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("pojo:default:hello").intercept(tracingInterceptor).target().to("pojo:default:bye");
-                
-//                from("pojo:default:bye").intercept(tracingInterceptor).target().to("pojo:default:hello");
             }
         });
 
         
         container.activateEndpoints();
 
-        /* TODO
-        
         // now lets fire in a message
-        PojoEndpoint endpoint = (PojoEndpoint) container.resolveEndpoint("pojo:default:hello");
-        ISay proxy = (ISay) endpoint.createInboundProxy(new Class[]{ISay.class});
+        PojoConsumer consumer = component.getConsumer("hello");        
+        ISay proxy = consumer.createProxy(ISay.class);
         String rc = proxy.say();
         assertEquals("Good Bye!", rc);
-
-        try {
-			endpoint = (PojoEndpoint) container.resolveEndpoint("pojo:default:bye");
-			proxy = (ISay) endpoint.createInboundProxy(new Class[]{ISay.class});
-			rc = proxy.say();
-			assertEquals("Hello!", rc);
-            fail("Should have thrown an exception as we are using an inactive endpoint");
-
-        } catch (IllegalStateException expected) {
-			// since bye is not active.
-		}
-		*/
-
+        assertEquals(1, hitCount.get());
+        
         container.deactivateEndpoints();
     }
 }



Mime
View raw message