camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1140159 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/
Date Mon, 27 Jun 2011 13:53:36 GMT
Author: davsclaus
Date: Mon Jun 27 13:53:36 2011
New Revision: 1140159

URL: http://svn.apache.org/viewvc?rev=1140159&view=rev
Log:
CAMEL-3655: Refactored having reusable code in file consumer to support both scheduled consumer
and polling consumer.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumerSupport.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1140159&r1=1140158&r2=1140159&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
Mon Jun 27 13:53:36 2011
@@ -42,9 +42,9 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements
BatchConsumer, ShutdownAware {
     protected final transient Logger log = LoggerFactory.getLogger(getClass());
+    protected final ProcessFile processFile;
     protected GenericFileEndpoint<T> endpoint;
     protected GenericFileOperations<T> operations;
-    protected boolean loggedIn;
     protected String fileExpressionResult;
     protected int maxMessagesPerPoll;
     protected volatile ShutdownRunningTask shutdownRunningTask;
@@ -54,6 +54,13 @@ public abstract class GenericFileConsume
         super(endpoint, processor);
         this.endpoint = endpoint;
         this.operations = operations;
+        this.processFile = new ProcessFile(this);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public GenericFileEndpoint<T> getEndpoint() {
+        return (GenericFileEndpoint<T>) super.getEndpoint();
     }
 
     /**
@@ -74,7 +81,7 @@ public abstract class GenericFileConsume
 
         // gather list of files to process
         List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
-        String name = endpoint.getConfiguration().getDirectory();
+        String name = getEndpoint().getConfiguration().getDirectory();
 
         // time how long time it takes to poll
         StopWatch stop = new StopWatch();
@@ -90,21 +97,21 @@ public abstract class GenericFileConsume
         }
 
         // sort files using file comparator if provided
-        if (endpoint.getSorter() != null) {
-            Collections.sort(files, endpoint.getSorter());
+        if (getEndpoint().getSorter() != null) {
+            Collections.sort(files, getEndpoint().getSorter());
         }
 
         // sort using build in sorters so we can use expressions
         LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
         for (GenericFile<T> file : files) {
-            Exchange exchange = endpoint.createExchange(file);
-            endpoint.configureExchange(exchange);
-            endpoint.configureMessage(file, exchange.getIn());
+            Exchange exchange = getEndpoint().createExchange(file);
+            getEndpoint().configureExchange(exchange);
+            getEndpoint().configureMessage(file, exchange.getIn());
             exchanges.add(exchange);
         }
         // sort files using exchange comparator if provided
-        if (endpoint.getSortBy() != null) {
-            Collections.sort(exchanges, endpoint.getSortBy());
+        if (getEndpoint().getSortBy() != null) {
+            Collections.sort(exchanges, getEndpoint().getSortBy());
         }
 
         // consume files one by one
@@ -156,7 +163,7 @@ public abstract class GenericFileConsume
             Exchange exchange = (Exchange) exchanges.poll();
             GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
             String key = file.getAbsoluteFilePath();
-            endpoint.getInProgressRepository().remove(key);
+            getEndpoint().getInProgressRepository().remove(key);
         }
 
         return total;
@@ -251,83 +258,22 @@ public abstract class GenericFileConsume
     }
 
     /**
+     * Gets the operations to be used
+     *
+     * @return the operations
+     */
+    public GenericFileOperations<T> getOperations() {
+        return operations;
+    }
+
+    /**
      * Processes the exchange
      *
      * @param exchange the exchange
      */
     protected void processExchange(final Exchange exchange) {
-        GenericFile<T> file = getExchangeFileProperty(exchange);
-        log.trace("Processing file: {}", file);
-
-        // must extract the absolute name before the begin strategy as the file could potentially
be pre moved
-        // and then the file name would be changed
-        String absoluteFileName = file.getAbsoluteFilePath();
-
-        // check if we can begin processing the file
-        try {
-            final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
-
-            boolean begin = processStrategy.begin(operations, endpoint, exchange, file);
-            if (!begin) {
-                log.debug(endpoint + " cannot begin processing file: {}", file);
-                // begin returned false, so remove file from the in progress list as its
no longer in progress
-                endpoint.getInProgressRepository().remove(absoluteFileName);
-                return;
-            }
-        } catch (Exception e) {
-            if (log.isDebugEnabled()) {
-                log.debug(endpoint + " cannot begin processing file: " + file + " due to:
" + e.getMessage(), e);
-            }
-            endpoint.getInProgressRepository().remove(absoluteFileName);
-            return;
-        }
-
-        // must use file from exchange as it can be updated due the
-        // preMoveNamePrefix/preMoveNamePostfix options
-        final GenericFile<T> target = getExchangeFileProperty(exchange);
-        // must use full name when downloading so we have the correct path
-        final String name = target.getAbsoluteFilePath();
-        try {
-            // retrieve the file using the stream
-            log.trace("Retrieving file: {} from: {}", name, endpoint);
-
-            // retrieve the file and check it was a success
-            boolean retrieved = operations.retrieveFile(name, exchange);
-            if (!retrieved) {
-                // throw exception to handle the problem with retrieving the file
-                // then if the method return false or throws an exception is handled the
same in here
-                // as in both cases an exception is being thrown
-                throw new GenericFileOperationFailedException("Cannot retrieve file: " +
file + " from: " + endpoint);
-            }
-
-            log.trace("Retrieved file: {} from: {}", name, endpoint);
-
-            // register on completion callback that does the completion strategies
-            // (for instance to move the file after we have processed it)
-            exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations,
target, absoluteFileName));
-
-            log.debug("About to process file: {} using exchange: {}", target, exchange);
-
-            // process the exchange using the async consumer to support async routing engine
-            // which can be supported by this file consumer as all the done work is
-            // provided in the GenericFileOnCompletion
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
-                public void done(boolean doneSync) {
-                    // noop
-                    if (log.isTraceEnabled()) {
-                        log.trace("Done processing file: {} {}", target, doneSync ? "synchronously"
: "asynchronously");
-                    }
-                }
-            });
-
-        } catch (Exception e) {
-            // remove file from the in progress list due to failure
-            // (cannot be in finally block due to GenericFileOnCompletion will remove it
-            // from in progress when it takes over and processes the file, which may happen
-            // by another thread at a later time. So its only safe to remove it if there
was an exception)
-            endpoint.getInProgressRepository().remove(absoluteFileName);
-            handleException(e);
-        }
+        // let the process do the work
+        processFile.processExchange(exchange);
     }
 
     /**
@@ -341,7 +287,7 @@ public abstract class GenericFileConsume
         if (!isMatched(file, isDirectory)) {
             log.trace("File did not match. Will skip this file: {}", file);
             return false;
-        } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath()))
{
+        } else if (getEndpoint().isIdempotent() && getEndpoint().getIdempotentRepository().contains(file.getAbsoluteFilePath()))
{
             log.trace("This consumer is idempotent and the file has been consumed before.
Will skip this file: {}", file);
             return false;
         }
@@ -382,26 +328,26 @@ public abstract class GenericFileConsume
             return true;
         }
 
-        if (endpoint.getFilter() != null) {
-            if (!endpoint.getFilter().accept(file)) {
+        if (getEndpoint().getFilter() != null) {
+            if (!getEndpoint().getFilter().accept(file)) {
                 return false;
             }
         }
 
-        if (ObjectHelper.isNotEmpty(endpoint.getExclude())) {
-            if (name.matches(endpoint.getExclude())) {
+        if (ObjectHelper.isNotEmpty(getEndpoint().getExclude())) {
+            if (name.matches(getEndpoint().getExclude())) {
                 return false;
             }
         }
 
-        if (ObjectHelper.isNotEmpty(endpoint.getInclude())) {
-            if (!name.matches(endpoint.getInclude())) {
+        if (ObjectHelper.isNotEmpty(getEndpoint().getInclude())) {
+            if (!name.matches(getEndpoint().getInclude())) {
                 return false;
             }
         }
 
         // use file expression for a simple dynamic file filter
-        if (endpoint.getFileName() != null) {
+        if (getEndpoint().getFileName() != null) {
             evaluateFileExpression();
             if (fileExpressionResult != null) {
                 if (!name.equals(fileExpressionResult)) {
@@ -411,19 +357,19 @@ public abstract class GenericFileConsume
         }
 
         // if done file name is enabled, then the file is only valid if a done file exists
-        if (endpoint.getDoneFileName() != null) {
+        if (getEndpoint().getDoneFileName() != null) {
             // done file must be in same path as the file
-            String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath());
-            ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
+            String doneFileName = getEndpoint().createDoneFileName(file.getAbsoluteFilePath());
+            ObjectHelper.notEmpty(doneFileName, "doneFileName", getEndpoint());
 
             // is it a done file name?
-            if (endpoint.isDoneFile(file.getFileNameOnly())) {
+            if (getEndpoint().isDoneFile(file.getFileNameOnly())) {
                 log.trace("Skipping done file: {}", file);
                 return false;
             }
 
             // the file is only valid if the done file exist
-            if (!operations.existsFile(doneFileName)) {
+            if (!getOperations().existsFile(doneFileName)) {
                 log.trace("Done file: {} does not exist", doneFileName);
                 return false;
             }
@@ -440,13 +386,13 @@ public abstract class GenericFileConsume
      */
     protected boolean isInProgress(GenericFile<T> file) {
         String key = file.getAbsoluteFilePath();
-        return !endpoint.getInProgressRepository().add(key);
+        return !getEndpoint().getInProgressRepository().add(key);
     }
 
     private void evaluateFileExpression() {
         if (fileExpressionResult == null) {
             // create a dummy exchange as Exchange is needed for expression evaluation
-            Exchange dummy = new DefaultExchange(endpoint.getCamelContext());
+            Exchange dummy = new DefaultExchange(getEndpoint().getCamelContext());
             fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
         }
     }
@@ -461,6 +407,35 @@ public abstract class GenericFileConsume
         super.doStart();
         
         // prepare on startup
-        endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint);
+        getEndpoint().getGenericFileProcessStrategy().prepareOnStartup(getOperations(), getEndpoint());
     }
+
+    /**
+     * Class the processes the exchange when a file has been polled.
+     */
+    private class ProcessFile extends GenericFileConsumerSupport<T> {
+
+        public ProcessFile(GenericFileConsumer<T> consumer) {
+            super(consumer);
+        }
+
+        @Override
+        void handleExceptionStrategy(Exception e) {
+            // handle the exception on the consumer
+            handleException(e);
+        }
+
+        @Override
+        void processFileStrategy(Exchange exchange) {
+            // process the exchange using the async consumer to support async routing engine
+            // which can be supported by this file consumer as all the done work is
+            // provided in the GenericFileOnCompletion
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                public void done(boolean doneSync) {
+                    // noop
+                }
+            });
+        }
+    }
+
 }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumerSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumerSupport.java?rev=1140159&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumerSupport.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumerSupport.java
Mon Jun 27 13:53:36 2011
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.Exchange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Support class for both a regular {@link org.apache.camel.Consumer}
+ * and a {@link org.apache.camel.PollingConsumer} using the file component.
+ * <p/>
+ * This class contains shared code between the two kind of consumers, to reuse logic.
+ * <p/>
+ * The method {@link #processExchange(org.apache.camel.Exchange)} should be invoked to
+ * process the consumed file. Then custom implementations can implement the strategy
+ * method for their custom logic.
+ */
+public abstract class GenericFileConsumerSupport<T> {
+    protected final transient Logger log = LoggerFactory.getLogger(getClass());
+    protected final GenericFileConsumer<T> consumer;
+
+    public GenericFileConsumerSupport(GenericFileConsumer<T> consumer) {
+        this.consumer = consumer;
+    }
+
+    /**
+     * Strategy to process the consumed file
+     *
+     * @param exchange the exchange with the file details
+     */
+    abstract void processFileStrategy(Exchange exchange);
+
+    /**
+     * Strategy to handle the exception thrown that occurred while processing the consumer
file
+     * <p/>
+     * Implementations will usually delegate to a {@link org.apache.camel.spi.ExceptionHandler}
+     * to handle the given exception.
+     *
+     * @param e the caused exception
+     */
+    abstract void handleExceptionStrategy(Exception e);
+
+    /**
+     * Processes the exchange.
+     * <p/>
+     * This method should be invoked to process the consumed file
+     *
+     * @param exchange the exchange
+     */
+    protected void processExchange(final Exchange exchange) {
+        GenericFile<T> file = getExchangeFileProperty(exchange);
+        log.trace("Processing file: {}", file);
+
+        // must extract the absolute name before the begin strategy as the file could potentially
be pre moved
+        // and then the file name would be changed
+        String absoluteFileName = file.getAbsoluteFilePath();
+
+        // check if we can begin processing the file
+        try {
+            final GenericFileProcessStrategy<T> processStrategy = consumer.getEndpoint().getGenericFileProcessStrategy();
+
+            boolean begin = processStrategy.begin(consumer.getOperations(), consumer.getEndpoint(),
exchange, file);
+            if (!begin) {
+                log.debug(consumer.getEndpoint() + " cannot begin processing file: {}", file);
+                // begin returned false, so remove file from the in progress list as its
no longer in progress
+                consumer.getEndpoint().getInProgressRepository().remove(absoluteFileName);
+                return;
+            }
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug(consumer.getEndpoint() + " cannot begin processing file: " + file
+ " due to: " + e.getMessage(), e);
+            }
+            consumer.getEndpoint().getInProgressRepository().remove(absoluteFileName);
+            return;
+        }
+
+        // must use file from exchange as it can be updated due the
+        // preMoveNamePrefix/preMoveNamePostfix options
+        final GenericFile<T> target = getExchangeFileProperty(exchange);
+        // must use full name when downloading so we have the correct path
+        final String name = target.getAbsoluteFilePath();
+        try {
+            // retrieve the file using the stream
+            log.trace("Retrieving file: {} from: {}", name, consumer.getEndpoint());
+
+            // retrieve the file and check it was a success
+            boolean retrieved = consumer.getOperations().retrieveFile(name, exchange);
+            if (!retrieved) {
+                // throw exception to handle the problem with retrieving the file
+                // then if the method return false or throws an exception is handled the
same in here
+                // as in both cases an exception is being thrown
+                throw new GenericFileOperationFailedException("Cannot retrieve file: " +
file + " from: " + consumer.getEndpoint());
+            }
+
+            log.trace("Retrieved file: {} from: {}", name, consumer.getEndpoint());
+
+            // register on completion callback that does the completion strategies
+            // (for instance to move the file after we have processed it)
+            exchange.addOnCompletion(new GenericFileOnCompletion<T>(consumer.getEndpoint(),
consumer.getOperations(), target, absoluteFileName));
+
+            log.debug("About to process file: {} using exchange: {}", target, exchange);
+
+            // process the file
+            processFileStrategy(exchange);
+
+        } catch (Exception e) {
+            // remove file from the in progress list due to failure
+            // (cannot be in finally block due to GenericFileOnCompletion will remove it
+            // from in progress when it takes over and processes the file, which may happen
+            // by another thread at a later time. So its only safe to remove it if there
was an exception)
+            consumer.getEndpoint().getInProgressRepository().remove(absoluteFileName);
+            handleExceptionStrategy(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
+        return (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
+    }
+
+}

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=1140159&r1=1140158&r2=1140159&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
(original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
Mon Jun 27 13:53:36 2011
@@ -39,11 +39,14 @@ public abstract class RemoteFileConsumer
         return (RemoteFileEndpoint<T>) super.getEndpoint();
     }
 
-    protected RemoteFileOperations getOperations() {
+    public RemoteFileOperations getOperations() {
         return (RemoteFileOperations) operations;
     }
 
     protected boolean prePollCheck() throws Exception {
+        if (log.isTraceEnabled()) {
+            log.trace("prePollCheck on " + getEndpoint().getConfiguration().remoteServerInformation());
+        }
         try {
             if (getEndpoint().getMaximumReconnectAttempts() > 0) {
                 // only use recoverable if we are allowed any re-connect attempts
@@ -71,6 +74,9 @@ public abstract class RemoteFileConsumer
 
     @Override
     protected void postPollCheck() {
+        if (log.isTraceEnabled()) {
+            log.trace("postPollCheck on " + getEndpoint().getConfiguration().remoteServerInformation());
+        }
         if (getEndpoint().isDisconnect()) {
             log.trace("postPollCheck disconnect from: {}", getEndpoint());
             disconnect();



Mime
View raw message