activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r533794 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/spi/ camel-file/src/main/java/org/apache/camel/component/file/ camel-jpa/src/main/java/org/apache/camel/component/...
Date Mon, 30 Apr 2007 16:41:51 GMT
Author: jstrachan
Date: Mon Apr 30 09:41:50 2007
New Revision: 533794

URL: http://svn.apache.org/viewvc?view=rev&rev=533794
Log:
added some helper methods to make it easier to write properly configured polling consumers.
Also added an ExceptionHandler for asynchronous processsors to deal with async exceptions
nicer

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingEndpoint.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExceptionHandler.java
  (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
    activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java
    activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
    activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java
    activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
    activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?view=diff&rev=533794&r1=533793&r2=533794
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
Mon Apr 30 09:41:50 2007
@@ -62,6 +62,10 @@
             return null;
         }
         if (parameters != null) {
+            if (endpoint instanceof PollingEndpoint) {
+                PollingEndpoint pollingEndpoint = (PollingEndpoint) endpoint;
+                pollingEndpoint.configureProperties(parameters);
+            }
             IntrospectionSupport.setProperties(endpoint, parameters);
         }
         return endpoint;

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?view=diff&rev=533794&r1=533793&r2=533794
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
Mon Apr 30 09:41:50 2007
@@ -21,6 +21,7 @@
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
+import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.ServiceHelper;
 
 /**
@@ -29,6 +30,7 @@
 public class DefaultConsumer<E extends Exchange> extends ServiceSupport implements
Consumer<E> {
     private Endpoint<E> endpoint;
     private Processor<E> processor;
+    private ExceptionHandler exceptionHandler;
 
     public DefaultConsumer(Endpoint<E> endpoint, Processor<E> processor) {
         this.endpoint = endpoint;
@@ -43,11 +45,31 @@
         return processor;
     }
 
+    public ExceptionHandler getExceptionHandler() {
+        if (exceptionHandler == null) {
+            exceptionHandler = new LoggingExceptionHandler(getClass());
+        }
+        return exceptionHandler;
+    }
+
+    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+    }
+
     protected void doStop() throws Exception {
         ServiceHelper.stopServices(processor);
     }
 
     protected void doStart() throws Exception {
         ServiceHelper.startServices(processor);
+    }
+
+    /**
+     * Handles the given exception using the {@link #getExceptionHandler()}
+     *
+     * @param t the exception to handle
+     */
+    protected void handleException(Throwable t) {
+        getExceptionHandler().handleException(t);
     }
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java?view=auto&rev=533794
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java
Mon Apr 30 09:41:50 2007
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.processor.Logger;
+import org.apache.camel.processor.LoggingLevel;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A default implementation of {@link ExceptionHandler} which uses a {@link Logger} to
+ * log to an arbitrary {@link Log} with some {@link LoggingLevel}
+ *
+ * @version $Revision: 1.1 $
+ */
+public class LoggingExceptionHandler implements ExceptionHandler {
+    private final Logger logger;
+
+    public LoggingExceptionHandler(Class ownerType) {
+        this(new Logger(LogFactory.getLog(ownerType), LoggingLevel.ERROR));
+    }
+
+    public LoggingExceptionHandler(Logger logger) {
+        this.logger = logger;
+    }
+
+    public void handleException(Throwable exception) {
+        logger.log(exception.getMessage(), exception);
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/LoggingExceptionHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingEndpoint.java?view=auto&rev=533794
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingEndpoint.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingEndpoint.java
Mon Apr 30 09:41:50 2007
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.util.IntrospectionSupport;
+
+import java.util.Map;
+
+/**
+ * A base class for {@link Endpoint} which creates a {@link PollingConsumer}
+ *
+ * @version $Revision: 1.1 $
+ */
+public abstract class PollingEndpoint<E extends Exchange> extends DefaultEndpoint<E>
{
+    private Map consumerProperties;
+
+    protected PollingEndpoint(String endpointUri, Component component) {
+        super(endpointUri, component);
+    }
+
+    public Map getConsumerProperties() {
+        return consumerProperties;
+    }
+
+    public void setConsumerProperties(Map consumerProperties) {
+        this.consumerProperties = consumerProperties;
+    }
+
+    protected void configureConsumer(Consumer<E> consumer) {
+        if (consumerProperties != null) {
+            IntrospectionSupport.setProperties(consumer, consumerProperties);
+        }
+    }
+
+    public void configureProperties(Map options) {
+        Map consumerProperties = IntrospectionSupport.extractProperties(options, "consumer.");
+        if (consumerProperties != null) {
+            setConsumerProperties(consumerProperties);
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExceptionHandler.java?view=auto&rev=533794
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExceptionHandler.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExceptionHandler.java
Mon Apr 30 09:41:50 2007
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+/**
+ * A Strategy pattern for handling exceptions; particularly in asynchronous processes such
as consumers
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface ExceptionHandler {
+    
+    /**
+     * Handles the given exception
+     *
+     * @param exception the exception
+     */
+    void handleException(Throwable exception);
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExceptionHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java?view=diff&rev=533794&r1=533793&r2=533794
==============================================================================
--- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java
(original)
+++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileComponent.java
Mon Apr 30 09:41:50 2007
@@ -17,12 +17,12 @@
  */
 package org.apache.camel.component.file;
 
-import java.io.File;
-import java.util.Map;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.DefaultComponent;
-import org.apache.camel.util.IntrospectionSupport;
+
+import java.io.File;
+import java.util.Map;
 
 /**
  * @version $Revision: 523772 $
@@ -37,8 +37,7 @@
 
     protected Endpoint<FileExchange> createEndpoint(String uri, String remaining, Map
parameters) throws Exception {
         File file = new File(remaining);
-        FileEndpoint result =  new FileEndpoint(file, remaining, this,parameters);
+        FileEndpoint result = new FileEndpoint(file, remaining, this);
         return result;
     }
-    
 }

Modified: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java?view=diff&rev=533794&r1=533793&r2=533794
==============================================================================
--- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
(original)
+++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
Mon Apr 30 09:41:50 2007
@@ -31,8 +31,8 @@
  * @version $Revision: 523016 $
  */
 public class FileConsumer extends PollingConsumer<FileExchange>{
-
     private static final transient Log log=LogFactory.getLog(FileConsumer.class);
+
     private final FileEndpoint endpoint;
     private boolean recursive=true;
     private boolean attemptFileLock=false;

Modified: activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java?view=diff&rev=533794&r1=533793&r2=533794
==============================================================================
--- activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java
(original)
+++ activemq/camel/trunk/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java
Mon Apr 30 09:41:50 2007
@@ -17,32 +17,41 @@
  */
 package org.apache.camel.component.file;
 
-import java.io.File;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.PollingEndpoint;
 import org.apache.camel.util.IntrospectionSupport;
 
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
 /**
  * @version $Revision: 523016 $
  */
-public class FileEndpoint extends DefaultEndpoint<FileExchange> {
+public class FileEndpoint extends PollingEndpoint<FileExchange> {
     private File file;
-    private Map parameters;
-    protected FileEndpoint(File file,String endpointUri, Component component,Map parameters){
-        super(endpointUri,component);
+    private ScheduledExecutorService executor;
+
+    protected FileEndpoint(File file, String endpointUri, FileComponent component) {
+        super(endpointUri, component);
         this.file = file;
-        this.parameters=parameters;
-        IntrospectionSupport.setProperties(this, parameters);
+        this.executor = component.getExecutorService();
     }
 
-   
-    private ScheduledExecutorService executor;
+
+    /**
+     * @return a Producer
+     * @throws Exception
+     * @see org.apache.camel.Endpoint#createProducer()
+     */
+    public Producer<FileExchange> createProducer() throws Exception {
+        Producer<FileExchange> result = new FileProducer(this);
+        return startService(result);
+    }
 
     /**
      * @param file
@@ -50,68 +59,51 @@
      * @throws Exception
      * @see org.apache.camel.Endpoint#createConsumer(org.apache.camel.Processor)
      */
-    public Consumer<FileExchange> createConsumer(Processor<FileExchange> file)
throws Exception{
-        Consumer<FileExchange> result =  new FileConsumer(this, file, getExecutor());
-        IntrospectionSupport.setProperties(result, parameters);
-        return result;
+    public Consumer<FileExchange> createConsumer(Processor<FileExchange> file)
throws Exception {
+        Consumer<FileExchange> result = new FileConsumer(this, file, getExecutor());
+        configureConsumer(result);
+        return startService(result);
     }
 
     /**
-     * @param file 
+     * @param file
      * @return a FileExchange
      * @see org.apache.camel.Endpoint#createExchange()
      */
-    public FileExchange createExchange(File file){
-        return new FileExchange(getContext(),file);
+    public FileExchange createExchange(File file) {
+        return new FileExchange(getContext(), file);
     }
-    
+
     /**
      * @return an Exchange
      * @see org.apache.camel.Endpoint#createExchange()
      */
-    public FileExchange createExchange(){
+    public FileExchange createExchange() {
         return createExchange(this.file);
     }
 
-
-    /**
-     * @return a Producer
-     * @throws Exception
-     * @see org.apache.camel.Endpoint#createProducer()
-     */
-    public Producer<FileExchange> createProducer() throws Exception{
-        Producer<FileExchange> result =  new FileProducer(this);
-        IntrospectionSupport.setProperties(result, parameters);
-        return result;
-    }
-
-    
     /**
      * @return the executor
      */
-    public synchronized ScheduledExecutorService getExecutor(){
-        if (this.executor==null) {
-            this.executor=new ScheduledThreadPoolExecutor(10);
+    public synchronized ScheduledExecutorService getExecutor() {
+        if (this.executor == null) {
+            this.executor = new ScheduledThreadPoolExecutor(10);
         }
         return executor;
     }
 
-    
     /**
      * @param executor the executor to set
      */
-    public synchronized void setExecutor(ScheduledExecutorService executor){
-        this.executor=executor;
+    public synchronized void setExecutor(ScheduledExecutorService executor) {
+        this.executor = executor;
     }
-    
+
     public File getFile() {
         return file;
     }
 
-	public boolean isSingleton() {
-		return true;
-	}
-  
-    
-
+    public boolean isSingleton() {
+        return true;
+    }
 }

Modified: activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java?view=diff&rev=533794&r1=533793&r2=533794
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
(original)
+++ activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
Mon Apr 30 09:41:50 2007
@@ -56,14 +56,6 @@
     @Override
     protected Endpoint<Exchange> createEndpoint(String uri, String path, Map options)
throws Exception {
         JpaEndpoint endpoint = new JpaEndpoint(uri, this);
-        Map consumerProperties = IntrospectionSupport.extractProperties(options, "consumer.");
-        if (consumerProperties != null) {
-            endpoint.setConsumerProperties(consumerProperties);
-        }
-        Map emProperties = IntrospectionSupport.extractProperties(options, "emf.");
-        if (emProperties != null) {
-            endpoint.setEntityManagerProperties(emProperties);
-        }
 
         // lets interpret the next string as a class
         if (path != null) {

Modified: activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java?view=diff&rev=533794&r1=533793&r2=533794
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
(original)
+++ activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
Mon Apr 30 09:41:50 2007
@@ -26,6 +26,7 @@
 import org.apache.camel.builder.ExpressionBuilder;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.PollingEndpoint;
 import org.apache.camel.util.IntrospectionSupport;
 import org.springframework.orm.jpa.JpaTemplate;
 
@@ -37,14 +38,13 @@
 /**
  * @version $Revision$
  */
-public class JpaEndpoint extends DefaultEndpoint<Exchange> {
+public class JpaEndpoint extends PollingEndpoint<Exchange> {
     private EntityManagerFactory entityManagerFactory;
     private String persistenceUnit = "camel";
     private JpaTemplate template;
     private Expression<Exchange> producerExpression;
     private int maximumResults = -1;
     private Class<?> entityType;
-    private Map consumerProperties;
     private Map entityManagerProperties;
 
     public JpaEndpoint(String uri, JpaComponent component) {
@@ -62,12 +62,23 @@
 
     public Consumer<Exchange> createConsumer(Processor<Exchange> processor) throws
Exception {
         JpaConsumer consumer = new JpaConsumer(this, processor, getExecutorService());
-        if (consumerProperties != null) {
-            IntrospectionSupport.setProperties(consumer, consumerProperties);
-        }
+        configureConsumer(consumer);
         return startService(consumer);
     }
 
+    @Override
+    public void configureProperties(Map options) {
+        super.configureProperties(options);
+        Map emProperties = IntrospectionSupport.extractProperties(options, "emf.");
+        if (emProperties != null) {
+            setEntityManagerProperties(emProperties);
+        }
+    }
+
+    public boolean isSingleton() {
+		return false;
+	}
+
     // Properties
     //-------------------------------------------------------------------------
     public JpaTemplate getTemplate() {
@@ -108,14 +119,6 @@
         this.entityType = entityType;
     }
 
-    public Map getConsumerProperties() {
-        return consumerProperties;
-    }
-
-    public void setConsumerProperties(Map consumerProperties) {
-        this.consumerProperties = consumerProperties;
-    }
-
     public EntityManagerFactory getEntityManagerFactory() {
         if (entityManagerFactory == null) {
             entityManagerFactory = createEntityManagerFactory();
@@ -190,9 +193,4 @@
             };
         }
     }
-    
-	public boolean isSingleton() {
-		return true;
-	}
-
 }



Mime
View raw message