camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r729480 [2/4] - in /activemq/camel/trunk/components/camel-ftp/src: main/java/org/apache/camel/component/file/remote/ main/java/org/apache/camel/component/file/remote/strategy/ main/resources/META-INF/services/org/apache/camel/component/ tes...
Date Fri, 26 Dec 2008 10:53:12 GMT
Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java?rev=729480&r1=729479&r2=729480&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java Fri Dec 26 02:53:10 2008
@@ -16,62 +16,281 @@
  */
 package org.apache.camel.component.file.remote;
 
-import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
+import org.apache.camel.Expression;
 import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.file.FileComponent;
 import org.apache.camel.impl.ScheduledPollEndpoint;
+import org.apache.camel.language.simple.FileLanguage;
+import org.apache.camel.util.FactoryFinder;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.UuidGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
-public abstract class RemoteFileEndpoint<T extends RemoteFileExchange> extends ScheduledPollEndpoint {
-    private RemoteFileBinding binding;
+/**
+ * Remote file endpoint.
+ */
+public class RemoteFileEndpoint extends ScheduledPollEndpoint {
+    private static final transient Log LOG = LogFactory.getLog(RemoteFileEndpoint.class);
+    private static final transient String DEFAULT_STRATEGYFACTORY_CLASS =
+            "org.apache.camel.component.file.remote.strategy.RemoteFileProcessStrategyFactory";
+
+    private RemoteFileProcessStrategy remoteFileProcessStrategy;
+    private RemoteFileOperations remoteFileOperations;
     private RemoteFileConfiguration configuration;
+    private boolean noop;
+    private String tempPrefix;
+    private String moveNamePrefix;
+    private String moveNamePostfix;
+    private String preMoveNamePrefix;
+    private String preMoveNamePostfix;
+    private String excludedNamePrefix;
+    private String excludedNamePostfix;
+    private boolean recursive;
+    private String regexPattern;
+    private boolean setNames = true;
+    private boolean delete;
+    private Expression expression;
+    private Expression preMoveExpression;
+    private RemoteFileFilter filter;
+    private Comparator<RemoteFile> sorter;
+    private Comparator<RemoteFileExchange> sortBy;
+    private RemoteFileExclusiveReadLockStrategy exclusiveReadLockStrategy;
+    private String readLock = "none";
+    private long readLockTimeout;
 
-    public RemoteFileEndpoint(String uri, RemoteFileComponent component, RemoteFileConfiguration configuration) {
+    public RemoteFileEndpoint(String uri, RemoteFileComponent component, RemoteFileOperations remoteFileOperations, RemoteFileConfiguration configuration) {
         super(uri, component);
+        this.remoteFileOperations = remoteFileOperations;
         this.configuration = configuration;
     }
 
-    protected RemoteFileEndpoint(String endpointUri, RemoteFileConfiguration configuration) {
-        super(endpointUri);
-        this.configuration = configuration;
+    public RemoteFileExchange createExchange() {
+        return new RemoteFileExchange(getCamelContext(), getExchangePattern(), null);
+    }
+
+    public RemoteFileExchange createExchange(RemoteFile remote) {
+        return new RemoteFileExchange(getCamelContext(), getExchangePattern(), remote);
     }
 
-    protected RemoteFileEndpoint(String endpointUri) {
-        this(endpointUri, new RemoteFileConfiguration());
+    public RemoteFileProducer createProducer() throws Exception {
+        return new RemoteFileProducer(this, remoteFileOperations);
     }
 
-    protected RemoteFileBinding createRemoteFileBinding() {
-        return new RemoteFileBinding();
+    public RemoteFileConsumer createConsumer(Processor processor) throws Exception {
+        String protocol = getConfiguration().getProtocol();
+        ObjectHelper.notEmpty(protocol, "protocol");
+
+        RemoteFileConsumer consumer = null;
+        if ("ftp".equals(protocol)) {
+            consumer = new FtpConsumer(this, processor, remoteFileOperations);
+        } else if ("sftp".equals(protocol)) {
+            consumer = new SftpConsumer(this, processor, remoteFileOperations);
+        } else {
+            throw new IllegalArgumentException("Unsupported protocol: " + protocol);
+        }
+
+        if (isDelete() && (getMoveNamePrefix() != null || getMoveNamePostfix() != null || getExpression() != null)) {
+            throw new IllegalArgumentException("You cannot set delete=true and a moveNamePrefix, moveNamePostfix or expression option");
+        }
+
+        configureConsumer(consumer);
+        return consumer;
     }
 
-    public Exchange createExchange() {
-        return (T) new RemoteFileExchange(getCamelContext(), getExchangePattern(), getBinding());
+    public boolean isSingleton() {
+        return true;
     }
 
-    public Exchange createExchange(ExchangePattern pattern) {
-        return (T) new RemoteFileExchange(getCamelContext(), pattern, getBinding());
+    /**
+     * Return the file name that will be auto-generated for the given message if none is provided
+     */
+    public String getGeneratedFileName(Message message) {
+        return getFileFriendlyMessageId(message.getMessageId());
     }
 
-    public T createExchange(String fullFileName, String fileName, long fileLength, ByteArrayOutputStream outputStream) {
-        return (T) new RemoteFileExchange(getCamelContext(), getExchangePattern(), getBinding(),
-                getConfiguration().getHost(), fullFileName, fileName, fileLength, outputStream);
+    protected String getFileFriendlyMessageId(String id) {
+        return UuidGenerator.generateSanitizedId(id);
     }
 
-    public RemoteFileBinding getBinding() {
-        if (binding == null) {
-            binding = createRemoteFileBinding();
+    public RemoteFileProcessStrategy getRemoteFileProcessStrategy() {
+        if (remoteFileProcessStrategy == null) {
+            remoteFileProcessStrategy = createRemoteFileStrategy();
+            LOG.debug("Using remote file process strategy: " + remoteFileProcessStrategy);
         }
-        return binding;
+        return remoteFileProcessStrategy;
     }
 
-    public void setBinding(RemoteFileBinding binding) {
-        this.binding = binding;
+    public void setRemoteFileProcessStrategy(RemoteFileProcessStrategy remoteFileProcessStrategy) {
+        this.remoteFileProcessStrategy = remoteFileProcessStrategy;
     }
 
-    public boolean isSingleton() {
-        return true;
+    public boolean isNoop() {
+        return noop;
+    }
+
+    public void setNoop(boolean noop) {
+        this.noop = noop;
+    }
+
+    public String getMoveNamePrefix() {
+        return moveNamePrefix;
+    }
+
+    public void setMoveNamePrefix(String moveNamePrefix) {
+        this.moveNamePrefix = moveNamePrefix;
+    }
+
+    public String getMoveNamePostfix() {
+        return moveNamePostfix;
+    }
+
+    public void setMoveNamePostfix(String moveNamePostfix) {
+        this.moveNamePostfix = moveNamePostfix;
+    }
+
+    public String getPreMoveNamePrefix() {
+        return preMoveNamePrefix;
+    }
+
+    public void setPreMoveNamePrefix(String preMoveNamePrefix) {
+        this.preMoveNamePrefix = preMoveNamePrefix;
+    }
+
+    public String getPreMoveNamePostfix() {
+        return preMoveNamePostfix;
+    }
+
+    public void setPreMoveNamePostfix(String preMoveNamePostfix) {
+        this.preMoveNamePostfix = preMoveNamePostfix;
+    }
+
+    public String getExcludedNamePrefix() {
+        return excludedNamePrefix;
+    }
+
+    public void setExcludedNamePrefix(String excludedNamePrefix) {
+        this.excludedNamePrefix = excludedNamePrefix;
+    }
+
+    public String getExcludedNamePostfix() {
+        return excludedNamePostfix;
+    }
+
+    public void setExcludedNamePostfix(String excludedNamePostfix) {
+        this.excludedNamePostfix = excludedNamePostfix;
+    }
+
+    public boolean isRecursive() {
+        return recursive;
+    }
+
+    public void setRecursive(boolean recursive) {
+        this.recursive = recursive;
+    }
+
+    public String getRegexPattern() {
+        return regexPattern;
+    }
+
+    public void setRegexPattern(String regexPattern) {
+        this.regexPattern = regexPattern;
+    }
+
+    public boolean isSetNames() {
+        return setNames;
+    }
+
+    public void setSetNames(boolean setNames) {
+        this.setNames = setNames;
+    }
+
+    public boolean isDelete() {
+        return delete;
+    }
+
+    public void setDelete(boolean delete) {
+        this.delete = delete;
+    }
+
+    public Expression getExpression() {
+        return expression;
+    }
+
+    public void setExpression(Expression expression) {
+        this.expression = expression;
+    }
+
+    /**
+     * Sets the expression based on {@link org.apache.camel.language.simple.FileLanguage}
+     */
+    public void setExpression(String fileLanguageExpression) {
+        this.expression = FileLanguage.file(fileLanguageExpression);
+    }
+
+    public Expression getPreMoveExpression() {
+        return preMoveExpression;
+    }
+
+    public void setPreMoveExpression(Expression preMoveExpression) {
+        this.preMoveExpression = preMoveExpression;
+    }
+
+    /**
+     * Sets the pre move expression based on {@link org.apache.camel.language.simple.FileLanguage}
+     */
+    public void setPreMoveExpression(String fileLanguageExpression) {
+        this.preMoveExpression = FileLanguage.file(fileLanguageExpression);
+    }
+
+    public RemoteFileFilter getFilter() {
+        return filter;
+    }
+
+    public void setFilter(RemoteFileFilter filter) {
+        this.filter = filter;
+    }
+
+    public Comparator<RemoteFile> getSorter() {
+        return sorter;
+    }
+
+    public void setSorter(Comparator<RemoteFile> sorter) {
+        this.sorter = sorter;
+    }
+
+    public Comparator<RemoteFileExchange> getSortBy() {
+        return sortBy;
+    }
+
+    public void setSortBy(Comparator<RemoteFileExchange> sortBy) {
+        this.sortBy = sortBy;
+    }
+
+    public void setSortBy(String expression) {
+        setSortBy(expression, false);
+    }
+
+    public void setSortBy(String expression, boolean reverse) {
+        setSortBy(DefaultRemoteFileSorter.sortByFileLanguage(expression, reverse));
+    }
+
+    public String getTempPrefix() {
+        return tempPrefix;
+    }
+
+    /**
+     * Enables and uses temporary prefix when writing files, after write it will be renamed to the correct name.
+     */
+    public void setTempPrefix(String tempPrefix) {
+        this.tempPrefix = tempPrefix;
     }
 
     public RemoteFileConfiguration getConfiguration() {
@@ -82,14 +301,121 @@
         this.configuration = configuration;
     }
 
+    public RemoteFileExclusiveReadLockStrategy getExclusiveReadLockStrategy() {
+        return exclusiveReadLockStrategy;
+    }
+
+    public void setExclusiveReadLockStrategy(RemoteFileExclusiveReadLockStrategy exclusiveReadLockStrategy) {
+        this.exclusiveReadLockStrategy = exclusiveReadLockStrategy;
+    }
+
+    public String getReadLock() {
+        return readLock;
+    }
+
+    public void setReadLock(String readLock) {
+        this.readLock = readLock;
+    }
+
+    public long getReadLockTimeout() {
+        return readLockTimeout;
+    }
+
+    public void setReadLockTimeout(long readLockTimeout) {
+        this.readLockTimeout = readLockTimeout;
+    }
+
     /**
-      * Return the file name that will be auto-generated for the given message if none is provided
-      */
-    public String getGeneratedFileName(Message message) {
-        return getFileFriendlyMessageId(message.getMessageId());
+     * Should the file be moved after consuming?
+     */
+    public boolean isMoveFile() {
+        return moveNamePostfix != null || moveNamePrefix != null || preMoveNamePostfix != null || preMoveNamePrefix != null || expression != null;
     }
 
-    protected String getFileFriendlyMessageId(String id) {
-        return UuidGenerator.generateSanitizedId(id);
+    /**
+     * Returns human readable server information for logging purpose
+     */
+    public String remoteServerInformation() {
+        return configuration.remoteServerInformation();
+    }
+
+    /**
+     * Configures the given message with the file which sets the body to the file object
+     * and sets the {@link FileComponent#HEADER_FILE_NAME} header.
+     */
+    public void configureMessage(RemoteFile file, Message message) {
+        message.setBody(file);
+        message.setHeader(FileComponent.HEADER_FILE_NAME, file.getRelativeFileName());
+    }
+
+    /**
+     * A strategy method to lazily create the file strategy
+     */
+    protected RemoteFileProcessStrategy createRemoteFileStrategy() {
+        Class<?> factory = null;
+        try {
+            FactoryFinder finder = new FactoryFinder("META-INF/services/org/apache/camel/component/");
+            factory = finder.findClass("ftp", "strategy.factory.");
+        } catch (ClassNotFoundException e) {
+            LOG.debug("'strategy.factory.class' not found", e);
+        } catch (IOException e) {
+            LOG.debug("No strategy factory defined in 'META-INF/services/org/apache/camel/component/'", e);
+        }
+
+        if (factory == null) {
+            // use default
+            factory = ObjectHelper.loadClass(DEFAULT_STRATEGYFACTORY_CLASS);
+            if (factory == null) {
+                throw new TypeNotPresentException("RemoteFileProcessStrategyFactory class not found", null);
+            }
+        }
+
+        try {
+            Method factoryMethod = factory.getMethod("createRemoteFileProcessStrategy", Map.class);
+            return (RemoteFileProcessStrategy) ObjectHelper.invokeMethod(factoryMethod, null, getParamsAsMap());
+        } catch (NoSuchMethodException e) {
+            throw new TypeNotPresentException(factory.getSimpleName()
+                    + ".createRemoteFileProcessStrategy(RemoteFileEndpoint endpoint) method not found", e);
+        }
     }
+
+    protected Map<String, Object> getParamsAsMap() {
+        Map<String, Object> params = new HashMap<String, Object>();
+
+        if (isNoop()) {
+            params.put("noop", Boolean.toString(true));
+        }
+        if (isDelete()) {
+            params.put("delete", Boolean.toString(true));
+        }
+        if (moveNamePrefix != null) {
+            params.put("moveNamePrefix", moveNamePrefix);
+        }
+        if (moveNamePostfix != null) {
+            params.put("moveNamePostfix", moveNamePostfix);
+        }
+        if (preMoveNamePrefix != null) {
+            params.put("preMoveNamePrefix", preMoveNamePrefix);
+        }
+        if (preMoveNamePostfix != null) {
+            params.put("preMoveNamePostfix", preMoveNamePostfix);
+        }
+        if (expression != null) {
+            params.put("expression", expression);
+        }
+        if (preMoveExpression != null) {
+            params.put("preMoveExpression", preMoveExpression);
+        }
+        if (exclusiveReadLockStrategy != null) {
+            params.put("exclusiveReadLockStrategy", exclusiveReadLockStrategy);
+        }
+        if (readLock != null) {
+            params.put("readLock", readLock);
+        }
+        if (readLockTimeout > 0) {
+            params.put("readLockTimeout", Long.valueOf(readLockTimeout));
+        }
+        return params;
+    }
+
 }

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileExchange.java?rev=729480&r1=729479&r2=729480&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileExchange.java (original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileExchange.java Fri Dec 26 02:53:10 2008
@@ -16,31 +16,56 @@
  */
 package org.apache.camel.component.file.remote;
 
-import java.io.ByteArrayOutputStream;
-
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.impl.DefaultExchange;
 
-public class RemoteFileExchange<T extends RemoteFileBinding> extends DefaultExchange {
-    private T binding;
+public class RemoteFileExchange extends DefaultExchange {
+    private RemoteFile remoteFile;
 
-    public RemoteFileExchange(CamelContext context, ExchangePattern pattern, T binding) {
+    public RemoteFileExchange(CamelContext context, ExchangePattern pattern, RemoteFile remoteFile) {
         super(context, pattern);
-        this.binding = binding;
+        setRemoteFile(remoteFile);
+    }
+
+    public RemoteFileExchange(DefaultExchange parent, RemoteFile remoteFile) {
+        super(parent);
+        setRemoteFile(remoteFile);
     }
 
-    public RemoteFileExchange(CamelContext context, ExchangePattern pattern, T binding, String host, 
-                              String fullFileName, String fileName, long fileLength, ByteArrayOutputStream outputStream) {
-        this(context, pattern, binding);
-        setIn(new RemoteFileMessage(host, fullFileName, fileName, fileLength, outputStream));
+    public RemoteFile getRemoteFile() {
+        return remoteFile;
     }
 
-    public T getBinding() {
-        return binding;
+    public void setRemoteFile(RemoteFile remoteFile) {
+        setIn(new RemoteFileMessage(remoteFile));
+        this.remoteFile = remoteFile;
+        populateHeaders(remoteFile);
     }
 
-    public void setBinding(T binding) {
-        this.binding = binding;
+    public Exchange newInstance() {
+        return new RemoteFileExchange(this, remoteFile);
     }
+
+    protected void populateHeaders(RemoteFile remoteFile) {
+        if (remoteFile != null) {
+            getIn().setHeader("file.remote.host", remoteFile.getHostname());
+            getIn().setHeader("file.remote.absoluteName", remoteFile.getAbsolutelFileName());
+            getIn().setHeader("file.remote.relativeName", remoteFile.getRelativeFileName());
+            getIn().setHeader("file.remote.name", remoteFile.getFileName());
+
+            getIn().setHeader("CamelFileName", remoteFile.getFileName());
+            getIn().setHeader("CamelFilePath", remoteFile.getAbsolutelFileName());
+            // set the parent if there is a parent folder
+            if (remoteFile.getAbsolutelFileName() != null && remoteFile.getAbsolutelFileName().indexOf("/") != -1) {
+                String parent = remoteFile.getAbsolutelFileName().substring(0, remoteFile.getAbsolutelFileName().lastIndexOf("/"));
+                getIn().setHeader("CamelFileParent", parent);
+            }
+            if (remoteFile.getFileLength() > 0) {
+                getIn().setHeader("CamelFileLength", new Long(remoteFile.getFileLength()));
+            }
+        }
+    }
+
 }

Added: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileExclusiveReadLockStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileExclusiveReadLockStrategy.java?rev=729480&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileExclusiveReadLockStrategy.java (added)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileExclusiveReadLockStrategy.java Fri Dec 26 02:53:10 2008
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+/**
+ * Strategy for acquiring exclusive read locks for files to be consumed.
+ * After granting the read lock it is realeased, we just want to make sure that when we start
+ * consuming the file its not currently in progress of being written by third party.
+ * <p/>
+ * Camel supports out of the box the following strategies:
+ * <ul>
+ *   <li>RemoteFileRenameExclusiveReadLockStrategy waiting until its possible to rename the file.</li>
+ * </ul>
+ */
+public interface RemoteFileExclusiveReadLockStrategy {
+
+    /**
+     * Acquires exclusive read lock to the file.
+     *
+     * @param ftp ftp file operations
+     * @param file the remote file
+     * @return <tt>true</tt> if read lock was acquired. If <tt>false</tt> Camel will skip the file and
+     *         try it on the next poll
+     */
+    boolean acquireExclusiveReadLock(RemoteFileOperations ftp, RemoteFile file);
+}

Added: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileFilter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileFilter.java?rev=729480&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileFilter.java (added)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileFilter.java Fri Dec 26 02:53:10 2008
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+/**
+ * A filter for {@link RemoteFile}.
+ */
+public interface RemoteFileFilter {
+
+    /**
+     * Tests whether or not the specified remote file should be included
+     *
+     * @param  file  the remote file to be tested
+     * @return  <code>true</code> if and only if <code>file</code> should be included
+     */
+    boolean accept(RemoteFile file);
+
+}

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileMessage.java?rev=729480&r1=729479&r2=729480&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileMessage.java (original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileMessage.java Fri Dec 26 02:53:10 2008
@@ -16,51 +16,19 @@
  */
 package org.apache.camel.component.file.remote;
 
-import java.io.OutputStream;
-import java.util.Map;
-
 import org.apache.camel.impl.DefaultMessage;
 
+/**
+ * Remote file message
+ */
 public class RemoteFileMessage extends DefaultMessage {
-    private OutputStream outputStream;
-    private String fullFileName;
-    private String fileName;
-    private String hostname;
-    private long fileLength;
+    private RemoteFile remoteFile;
 
     public RemoteFileMessage() {
     }
 
-    public RemoteFileMessage(String hostname, String fullFileName, String fileName, long fileLength, OutputStream outputStream) {
-        this.hostname = hostname;
-        this.fullFileName = fullFileName;
-        this.fileName = fileName;
-        this.fileLength = fileLength;
-        this.outputStream = outputStream;
-    }
-
-    public String getHostname() {
-        return hostname;
-    }
-
-    public void setHostname(String hostname) {
-        this.hostname = hostname;
-    }
-
-    public String getFullFileName() {
-        return fullFileName;
-    }
-
-    public void setFullFileName(String fullFileName) {
-        this.fullFileName = fullFileName;
-    }
-
-    public OutputStream getOutputStream() {
-        return outputStream;
-    }
-
-    public void setOutputStream(OutputStream outputStream) {
-        this.outputStream = outputStream;
+    public RemoteFileMessage(RemoteFile remoteFile) {
+        this.remoteFile = remoteFile;
     }
 
     @Override
@@ -70,10 +38,15 @@
 
     @Override
     protected Object createBody() {
-        if (outputStream != null) {
-            return getExchange().getBinding().extractBodyFromOutputStream(getExchange(), outputStream);
-        }
-        return null;
+        return remoteFile.getBody();
+    }
+
+    public RemoteFile getRemoteFile() {
+        return remoteFile;
+    }
+
+    public void setRemoteFile(RemoteFile remoteFile) {
+        this.remoteFile = remoteFile;
     }
 
     @Override
@@ -82,21 +55,7 @@
     }
 
     @Override
-    protected void populateInitialHeaders(Map<String, Object> map) {
-        super.populateInitialHeaders(map);
-        map.put("file.remote.host", hostname);
-        map.put("file.remote.fullName", fullFileName);
-        map.put("file.remote.name", fileName);
-
-        map.put("CamelFileName", fileName);
-        map.put("CamelFilePath", fullFileName);
-        // set the parent if there is a parent folder
-        if (fullFileName != null && fullFileName.indexOf("/") != -1) {
-            String parent = fullFileName.substring(0, fullFileName.lastIndexOf("/"));
-            map.put("CamelFileParent", parent);
-        }
-        if (fileLength > 0) {
-            map.put("CamelFileLength", new Long(fileLength));
-        }
+    public String toString() {
+        return "RemoteFileMessage: " + remoteFile;
     }
 }

Copied: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileOperationFailedException.java (from r727937, activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperationFailedException.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileOperationFailedException.java?p2=activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileOperationFailedException.java&p1=activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperationFailedException.java&r1=727937&r2=729480&rev=729480&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperationFailedException.java (original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileOperationFailedException.java Fri Dec 26 02:53:10 2008
@@ -19,33 +19,55 @@
 import org.apache.camel.RuntimeCamelException;
 
 /**
- * Exception thrown in case of last FTP operation failed.
+ * Exception thrown in case of last remote file operation failed.
  *
  * @version $Revision$
  */
-public class FtpOperationFailedException extends RuntimeCamelException {
+public class RemoteFileOperationFailedException extends RuntimeCamelException {
     private final int code;
     private final String reason;
 
-    public FtpOperationFailedException(int code, String reason) {
-        super("Ftp operation failed: " + reason + ". Code: " + code);
+    public RemoteFileOperationFailedException(String message) {
+        super(message);
+        this.code = 0;
+        this.reason = null;
+    }
+
+    public RemoteFileOperationFailedException(String message, Throwable cause) {
+        super(message, cause);
+        this.code = 0;
+        this.reason = null;
+    }
+
+    public RemoteFileOperationFailedException(int code, String reason) {
+        super("Remote file operation failed: " + reason + ". Code: " + code);
+        this.code = code;
+        this.reason = reason;
+    }
+
+    public RemoteFileOperationFailedException(int code, String reason, Throwable cause) {
+        super("Remote file operation failed: " + reason + ". Code: " + code, cause);
         this.code = code;
         this.reason = reason;
     }
 
-    public FtpOperationFailedException(int code, String reason, String message) {
+    public RemoteFileOperationFailedException(int code, String reason, String message) {
         this(code, reason + " " + message);
     }
 
+    public RemoteFileOperationFailedException(int code, String reason, String message, Throwable cause) {
+        this(code, reason + " " + message, cause);
+    }
+
     /**
-     * Return the FTP failure code
+     * Return the FTP failure code (if any)
      */
     public int getCode() {
         return code;
     }
 
     /**
-     * Return the FTP failure reason
+     * Return the FTP failure reason (if any)
      */
     public String getReason() {
         return reason;

Propchange: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileOperationFailedException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileOperationFailedException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileOperations.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileOperations.java?rev=729480&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileOperations.java (added)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileOperations.java Fri Dec 26 02:53:10 2008
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * Remote file operations based on some backing framework
+ */
+public interface RemoteFileOperations<T> {
+
+    /**
+     * Connects to the remote server
+     *
+     * @param configuration configuraiton
+     * @return true if connected
+     * @throws RemoteFileOperationFailedException can be thrown
+     */
+    boolean connect(RemoteFileConfiguration configuration) throws RemoteFileOperationFailedException;
+
+    /**
+     * Returns whether we are connected to the remote server or not
+     *
+     * @return true if connected, false if not
+     * @throws RemoteFileOperationFailedException can be thrown
+     */
+    boolean isConnected() throws RemoteFileOperationFailedException;
+
+    /**
+     * Discconects from the remote server
+     *
+     * @throws RemoteFileOperationFailedException can be thrown
+     */
+    void disconnect() throws RemoteFileOperationFailedException;
+
+    /**
+     * Deletes the file from the remote server
+     *
+     * @param name name of the file
+     * @return true if deleted, false if not
+     * @throws RemoteFileOperationFailedException can be thrown
+     */
+    boolean deleteFile(String name) throws RemoteFileOperationFailedException;
+
+    /**
+     * Renames the file on the remote server
+     *
+     * @param from original name
+     * @param to   the new name
+     * @return true if renamed, false if not
+     * @throws RemoteFileOperationFailedException can be thrown
+     */
+    boolean renameFile(String from, String to) throws RemoteFileOperationFailedException;
+
+    /**
+     * Builds the directory structure on the remote server. Will test if the folder already exists.
+     *
+     * @param directory the directory path to build
+     * @return true if build or already exists, false if not possbile (could be lack of permissions)
+     * @throws RemoteFileOperationFailedException can be thrown
+     */
+    boolean buildDirectory(String directory) throws RemoteFileOperationFailedException;
+
+    /**
+     * Retrieves the remote file (download)
+     *
+     * @param name  name of the file
+     * @param out   stream to write the content of the file into
+     * @return true if file has been retrieved, false if not
+     * @throws RemoteFileOperationFailedException can be thrown
+     */
+    boolean retrieveFile(String name, OutputStream out) throws RemoteFileOperationFailedException;
+
+    /**
+     * Stores the content as a new remote file (upload)
+     *
+     * @param name  name of new file
+     * @param body  content of the file
+     * @return true if the file was stored, false if not
+     * @throws RemoteFileOperationFailedException can be thrown
+     */
+    boolean storeFile(String name, InputStream body) throws RemoteFileOperationFailedException;
+
+    /**
+     * Gets the current remote directory
+     * @return the current directory path
+     * @throws RemoteFileOperationFailedException can be thrown
+     */
+    String getCurrentDirectory() throws RemoteFileOperationFailedException;
+
+    /**
+     * Change the current remote directory
+     *
+     * @param path the path to change to
+     * @throws RemoteFileOperationFailedException can be thrown
+     */
+    void changeCurrentDirectory(String path) throws RemoteFileOperationFailedException;
+
+    /**
+     * List the files in the current remote directory
+     *
+     * @return a list of backing objects representing the files
+     * @throws RemoteFileOperationFailedException can be thrown
+     */
+    List listFiles() throws RemoteFileOperationFailedException;
+
+    /**
+     * List the files in the given remote directory
+     *
+     * @param path  the remote directory
+     * @return a list of backing objects representing the files
+     * @throws RemoteFileOperationFailedException can be thrown
+     */
+    List listFiles(String path) throws RemoteFileOperationFailedException;
+
+}

Added: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProcessStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProcessStrategy.java?rev=729480&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProcessStrategy.java (added)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProcessStrategy.java Fri Dec 26 02:53:10 2008
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+/**
+ * Represents a strategy for marking that a remote file is processed.
+ */
+public interface RemoteFileProcessStrategy {
+
+    /**
+     * Called when work is about to begin on this file. This method may attempt to acquire some file lock before
+     * returning true; returning false if the file lock could not be obtained so that the file should be ignored.
+     *
+     * @param operations ftp operations
+     * @param endpoint the endpoint
+     * @param exchange the exchange
+     * @param file     the remote file
+     * @return true if the file can be processed (such as if a file lock could be obtained)
+     * @throws Exception can be thrown in case of errors
+     */
+    boolean begin(RemoteFileOperations operations, RemoteFileEndpoint endpoint, RemoteFileExchange exchange, RemoteFile file) throws Exception;
+
+    /**
+     * Releases any file locks and possibly deletes or moves the file after successful processing
+     *
+     * @param operations ftp operations
+     * @param endpoint the endpoint
+     * @param exchange the exchange
+     * @param file     the remote file
+     * @throws Exception can be thrown in case of errors
+     */
+    void commit(RemoteFileOperations operations, RemoteFileEndpoint endpoint, RemoteFileExchange exchange, RemoteFile file) throws Exception;
+
+    /**
+     * Releases any file locks and possibly deletes or moves the file after unsuccessful processing
+     *
+     * @param operations ftp operations
+     * @param endpoint the endpoint
+     * @param exchange the exchange
+     * @param file     the remote file
+     */
+    void rollback(RemoteFileOperations operations, RemoteFileEndpoint endpoint, RemoteFileExchange exchange, RemoteFile file);
+
+}

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java?rev=729480&r1=729479&r2=729480&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java (original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java Fri Dec 26 02:53:10 2008
@@ -16,21 +16,123 @@
  */
 package org.apache.camel.component.file.remote;
 
+import java.io.IOException;
+import java.io.InputStream;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.component.file.FileComponent;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.language.simple.FileLanguage;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public abstract class RemoteFileProducer<T extends RemoteFileExchange> extends DefaultProducer {
-    protected final transient Log log = LogFactory.getLog(getClass());
-    protected RemoteFileEndpoint<T> endpoint;
+/**
+ * Remote file producer
+ */
+public class RemoteFileProducer extends DefaultProducer {
+    private static final transient Log LOG = LogFactory.getLog(RemoteFileProducer.class);
+    private RemoteFileEndpoint endpoint;
+    private RemoteFileOperations ftp;
+    private boolean loggedIn;
 
-    protected RemoteFileProducer(RemoteFileEndpoint<T> endpoint) {
+    protected RemoteFileProducer(RemoteFileEndpoint endpoint, RemoteFileOperations ftp) {
         super(endpoint);
         this.endpoint = endpoint;
+        this.ftp = ftp;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        RemoteFileExchange remoteExchange = (RemoteFileExchange) endpoint.createExchange(exchange);
+        processExchange(remoteExchange);
+        ExchangeHelper.copyResults(exchange, remoteExchange);
+    }
+
+    protected void processExchange(RemoteFileExchange exchange) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Processing " + exchange);
+        }
+
+        try {
+            connectIfNecessary();
+
+            if (!loggedIn) {
+                // must be logged in to be able to upload the file
+                String message = "Could not connect/login to: " + endpoint.remoteServerInformation();
+                throw new RemoteFileOperationFailedException(message);
+            }
+
+            String target = createFileName(exchange);
+
+            // should we write to a temporary name and then afterwards rename to real target
+            boolean writeAsTempAndRename = ObjectHelper.isNotEmpty(endpoint.getTempPrefix());
+            String tempTarget = null;
+            if (writeAsTempAndRename) {
+                // compute temporary name with the temp prefix
+                tempTarget = createTempFileName(target);
+            }
+
+            // upload the file
+            writeFile(exchange, tempTarget != null ? tempTarget : target);
+
+            // if we did write to a temporary name then rename it to the real name after we have written the file
+            if (tempTarget != null) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Renaming file: " + tempTarget + " to: " + target);
+                }
+                boolean renamed = ftp.renameFile(tempTarget, target);
+                if (!renamed) {
+                    throw new RemoteFileOperationFailedException("Cannot rename file from: " + tempTarget + " to: " + target);
+                }
+            }
+
+            // lets store the name we really used in the header, so end-users can retrieve it
+            exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME_PRODUCED, target);
+
+        } catch (Exception e) {
+            loggedIn = false;
+            if (isStopping() || isStopped()) {
+                // if we are stopping then ignore any exception during a poll
+                LOG.debug("Exception occurd during stopping. " + e.getMessage());
+            } else {
+                LOG.debug("Exception occurd during processing.", e);
+                disconnect();
+                // Rethrow to signify that we didn't poll
+                throw e;
+            }
+        }
+    }
+
+    protected void writeFile(Exchange exchange, String fileName) throws RemoteFileOperationFailedException, IOException {
+        InputStream payload = exchange.getIn().getBody(InputStream.class);
+        try {
+            // build directory
+            int lastPathIndex = fileName.lastIndexOf('/');
+            if (lastPathIndex != -1) {
+                String directory = fileName.substring(0, lastPathIndex);
+                if (!ftp.buildDirectory(directory)) {
+                    LOG.warn("Couldn't build directory: " + directory + " (could be because of denied permissions)");
+                }
+            }
+
+            // upload
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("About to send: " + fileName + " to: " + remoteServer() + " from exchange: " + exchange);
+            }
+
+            boolean success = ftp.storeFile(fileName, payload);
+            if (!success) {
+                throw new RemoteFileOperationFailedException("Error sending file: " + fileName + " to: " + remoteServer());
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Sent: " + fileName + " to: " + remoteServer());
+            }
+        } finally {
+            ObjectHelper.close(payload, "Closing payload", LOG);
+        }
     }
 
     protected String createFileName(Exchange exchange) {
@@ -39,23 +141,23 @@
         String name = exchange.getIn().getHeader(FileComponent.HEADER_FILE_NAME, String.class);
 
         // expression support
-        Expression expression = endpoint.getConfiguration().getExpression();
+        Expression expression = endpoint.getExpression();
         if (name != null) {
             // the header name can be an expression too, that should override whatever configured on the endpoint
             if (name.indexOf("${") > -1) {
-                if (log.isDebugEnabled()) {
-                    log.debug(FileComponent.HEADER_FILE_NAME + " contains a FileLanguage expression: " + name);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(FileComponent.HEADER_FILE_NAME + " contains a FileLanguage expression: " + name);
                 }
                 expression = FileLanguage.file(name);
             }
         }
         if (expression != null) {
-            if (log.isDebugEnabled()) {
-                log.debug("Filename evaluated as expression: " + expression);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Filename evaluated as expression: " + expression);
             }
             Object result = expression.evaluate(exchange);
             name = exchange.getContext().getTypeConverter().convertTo(String.class, result);
-        }        
+        }
 
         String endpointFile = endpoint.getConfiguration().getFile();
         if (endpoint.getConfiguration().isDirectory()) {
@@ -64,7 +166,7 @@
             if (endpointFile.length() > 0) {
                 baseDir = endpointFile + (endpointFile.endsWith("/") ? "" : "/");
             }
-            String fileName = (name != null) ? name : endpoint.getGeneratedFileName(exchange.getIn()); 
+            String fileName = (name != null) ? name : endpoint.getGeneratedFileName(exchange.getIn());
             answer = baseDir + fileName;
         } else {
             answer = endpointFile;
@@ -77,21 +179,17 @@
         int path = fileName.lastIndexOf("/");
         if (path == -1) {
             // no path
-            return endpoint.getConfiguration().getTempPrefix() + fileName;
+            return endpoint.getTempPrefix() + fileName;
         } else {
             StringBuilder sb = new StringBuilder(fileName);
-            sb.insert(path + 1, endpoint.getConfiguration().getTempPrefix());
+            sb.insert(path + 1, endpoint.getTempPrefix());
             return sb.toString();
         }
     }
 
-    protected String remoteServer() {
-        return endpoint.getConfiguration().remoteServerInformation();
-    }
-
     @Override
     protected void doStart() throws Exception {
-        log.info("Starting");
+        LOG.debug("Starting");
         // do not connect when component starts, just wait until we process as we will
         // connect at that time if needed
         super.doStart();
@@ -99,19 +197,38 @@
 
     @Override
     protected void doStop() throws Exception {
-        log.info("Stopping");
-        // disconnect when stopping
+        LOG.debug("Stopping");
         try {
             disconnect();
         } catch (Exception e) {
-            // ignore just log a warning
-            log.warn("Exception occured during disconecting from " + remoteServer() + ". "
-                     + e.getClass().getCanonicalName() + " message: " + e.getMessage());
+            // ignore by logging it
+            LOG.debug("Exception occured during disconnecting from " + remoteServer() + " " + e.getMessage());
         }
         super.doStop();
     }
 
-    protected abstract void connectIfNecessary() throws Exception;
+    protected void connectIfNecessary() throws IOException {
+        if (!ftp.isConnected() || !loggedIn) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Not connected/logged in, connecting to " + remoteServer());
+            }
+            loggedIn = ftp.connect(endpoint.getConfiguration());
+            if (!loggedIn) {
+                return;
+            }
+            LOG.info("Connected and logged in to " + remoteServer());
+        }
+    }
+
+    public void disconnect() throws IOException {
+        loggedIn = false;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Disconnecting from " + remoteServer());
+        }
+        ftp.disconnect();
+    }
 
-    protected abstract void disconnect() throws Exception;
+    protected String remoteServer() {
+        return endpoint.remoteServerInformation();
+    }
 }

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=729480&r1=729479&r2=729480&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java (original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java Fri Dec 26 02:53:10 2008
@@ -16,281 +16,92 @@
  */
 package org.apache.camel.component.file.remote;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Vector;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.List;
 
 import com.jcraft.jsch.ChannelSftp;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
-import com.jcraft.jsch.SftpException;
-
 import org.apache.camel.Processor;
-import org.apache.camel.component.file.FileComponent;
-
-public class SftpConsumer extends RemoteFileConsumer<RemoteFileExchange> {
-    private final SftpEndpoint endpoint;
 
-    private ChannelSftp channel;
-    private Session session;
-
-    public SftpConsumer(SftpEndpoint endpoint, Processor processor, Session session) {
-        super(endpoint, processor);
-        this.endpoint = endpoint;
-        this.session = session;
-    }
-
-    public SftpConsumer(SftpEndpoint endpoint, Processor processor, Session session, ScheduledExecutorService executor) {
-        super(endpoint, processor, executor);
-        this.endpoint = endpoint;
-        this.session = session;
-    }
+/**
+ * SFTP consumer
+ */
+public class SftpConsumer extends RemoteFileConsumer {
 
-    protected void doStart() throws Exception {
-        log.info("Starting");
-        super.doStart();
+    public SftpConsumer(RemoteFileEndpoint endpoint, Processor processor, RemoteFileOperations remoteFileOperations) {
+        super(endpoint, processor, remoteFileOperations);
     }
 
-    protected void doStop() throws Exception {
-        log.info("Stopping");
-        // disconnect when stopping
-        try {
-            disconnect();
-        } catch (Exception e) {
-            // ignore just log a warning
-            log.warn("Exception occured during disconecting from " + remoteServer() + ". "
-                     + e.getClass().getCanonicalName() + " message: " + e.getMessage());
+    protected void pollDirectory(String fileName, boolean processDir, List<RemoteFile> fileList) {
+        if (fileName == null) {
+            return;
         }
-        super.doStop();
-    }
 
-    protected void connectIfNecessary() throws JSchException {
-        if (channel == null || !channel.isConnected()) {
-            if (session == null || !session.isConnected()) {
-                log.trace("Session isn't connected, trying to recreate and connect.");
-                session = endpoint.createSession();
-                session.connect();
-            }
-            log.trace("Channel isn't connected, trying to recreate and connect.");
-            channel = endpoint.createChannelSftp(session);
-            channel.connect();
-            log.info("Connected to " + remoteServer());
-        }
-    }
+        String currentDir = operations.getCurrentDirectory();
+        operations.changeCurrentDirectory(fileName);
 
-    protected void disconnect() throws JSchException {
-        log.debug("Disconnecting from " + remoteServer());
-        if (session != null) {
-            session.disconnect();
-        }
-        if (channel != null) {
-            channel.disconnect();
-        }
-    }
-
-    protected void poll() throws Exception {
-        if (log.isTraceEnabled()) {
-            log.trace("Polling " + endpoint.getConfiguration());
-        }
-        connectIfNecessary();
-        // If the attempt to connect isn't successful, then the thrown
-        // exception will signify that we couldn't poll
-        try {
-            final String fileName = endpoint.getConfiguration().getFile();
-            if (endpoint.getConfiguration().isDirectory()) {
-                pollDirectory(fileName);
-            } else {
-                int index = fileName.lastIndexOf('/');
-                if (index > -1) {
-                    // cd to the folder of the filename
-                    channel.cd(fileName.substring(0, index));
-                }
-
-                // list the files in the fold and poll the first file
-                final Vector files = channel.ls(fileName.substring(index + 1));
-                final ChannelSftp.LsEntry file = (ChannelSftp.LsEntry) files.get(0);
-                pollFile(file);
-            }
-            lastPollTime = System.currentTimeMillis();
-        } catch (Exception e) {
-            if (isStopping() || isStopped()) {
-                // if we are stopping then ignore any exception during a poll
-                log.warn("Consumer is stopping. Ignoring caught exception: "
-                         + e.getClass().getCanonicalName() + " message: " + e.getMessage());
-            } else {
-                log.warn("Exception occured during polling: "
-                         + e.getClass().getCanonicalName() + " message: " + e.getMessage());
-                disconnect();
-                // Rethrow to signify that we didn't poll
-                throw e;
-            }
-        }
-    }
-
-    protected void pollDirectory(String dir) throws Exception {
         if (log.isTraceEnabled()) {
-            log.trace("Polling directory: " + dir);
+            log.trace("Polling directory: " + fileName);
         }
-        String currentDir = channel.pwd();
-
-        channel.cd(dir);
-        Vector files = channel.ls(".");
-        for (int i = 0; i < files.size(); i++) {
-            ChannelSftp.LsEntry sftpFile = (ChannelSftp.LsEntry)files.get(i);
-            if (sftpFile.getFilename().startsWith(".")) {
-                // skip
-            } else if (sftpFile.getAttrs().isDir()) {
-                if (isRecursive()) {
-                    pollDirectory(getFullFileName(sftpFile));
-                }
+        List<ChannelSftp.LsEntry> files = operations.listFiles();
+        for (ChannelSftp.LsEntry file : files) {
+            RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(file);
+            if (processDir && file.getAttrs().isDir() && isValidFile(remote, true)) {
+                // recursive scan and add the sub files and folders
+                pollDirectory(file.getFilename(), endpoint.isRecursive(), fileList);
+            } else if (!file.getAttrs().isLink() && isValidFile(remote, false)) {
+                // matched file so add
+                fileList.add(remote);
             } else {
-                pollFile(sftpFile);
+                log.debug("Ignoring unsupported file type " + file);
             }
         }
 
-        // change back to original current dir
-        channel.cd(currentDir);
+        operations.changeCurrentDirectory(currentDir);
     }
 
-    protected String getFullFileName(ChannelSftp.LsEntry sftpFile) throws IOException, SftpException {
-        return channel.pwd() + "/" + sftpFile.getFilename();
-    }
-
-    private void pollFile(ChannelSftp.LsEntry sftpFile) throws Exception {
-        if (log.isTraceEnabled()) {
-            log.trace("Polling file: " + sftpFile);
+    /**
+     * Polls the given file
+     *
+     * @param fileName  the file name
+     * @param fileList  current list of files gathered
+     */
+    protected void pollFile(String fileName, List<RemoteFile> fileList) {
+        int index = fileName.lastIndexOf("/");
+        if (index > -1) {
+            // cd to the folder of the filename
+            operations.changeCurrentDirectory(fileName.substring(0, index));
         }
-
-        // if using last polltime for timestamp matcing (to be removed in Camel 2.0)
-        boolean timestampMatched = true;
-        if (isTimestamp()) {
-            // TODO do we need to adjust the TZ? can we?
-            long ts = sftpFile.getAttrs().getMTime() * 1000L;
-            timestampMatched = ts > lastPollTime;
-            if (log.isTraceEnabled()) {
-                log.trace("The file is to old + " + sftpFile + ". lastPollTime=" + lastPollTime + " > fileTimestamp=" + ts);
-            }
-        }
-
-        if (timestampMatched && isMatched(sftpFile)) {
-            String fullFileName = getFullFileName(sftpFile);
-
-            // is we use excluse read then acquire the exclusive read (waiting until we got it)
-            if (exclusiveReadLock) {
-                acquireExclusiveReadLock(sftpFile);
-            }
-
-            // retrieve the file
-            final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-            channel.get(sftpFile.getFilename(), byteArrayOutputStream);
-            if (log.isDebugEnabled()) {
-                log.debug("Retrieved file: " + sftpFile.getFilename() + " from: " + remoteServer());
-            }
-
-            RemoteFileExchange exchange = endpoint.createExchange(getFullFileName(sftpFile),
-                    sftpFile.getFilename(), sftpFile.getAttrs().getSize(), byteArrayOutputStream);
-
-            if (isSetNames()) {
-                String ftpBasePath = endpoint.getConfiguration().getFile();
-                String relativePath = fullFileName.substring(ftpBasePath.length() + 1);
-                relativePath = relativePath.replaceFirst("/", "");
-
-                if (log.isDebugEnabled()) {
-                    log.debug("Setting exchange filename to " + relativePath);
-                }
-                exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME, relativePath);
-            }
-
-            if (deleteFile) {
-                // delete file after consuming
-                if (log.isDebugEnabled()) {
-                    log.debug("Deleteing file: " + sftpFile.getFilename() + " from: " + remoteServer());
-                }
-                deleteFile(sftpFile.getFilename());
-            } else if (isMoveFile()) {
-                String fromName = sftpFile.getFilename();
-                String toName = getMoveFileName(fromName, exchange);
-                if (log.isDebugEnabled()) {
-                    log.debug("Moving file: " + fromName + " to: " + toName);
-                }
-
-                // delete any existing file
-                boolean deleted = deleteFile(toName);
-                if (!deleted) {
-                    // if we could not delete any existing file then maybe the folder is missing
-                    // build folder if needed
-                    int lastPathIndex = toName.lastIndexOf('/');
-                    if (lastPathIndex != -1) {
-                        String directory = toName.substring(0, lastPathIndex);
-                        if (!SftpUtils.buildDirectory(channel, directory)) {
-                            log.warn("Can not build directory: " + directory + " (maybe because of denied permissions)");
-                        }
-                    }
-                }
-
-                // try to rename
-                try {
-                    channel.rename(fromName, toName);
-                } catch (SftpException e) {
-                    // ignore just log a warning
-                    log.warn("Can not move file: " + fromName + " to: " + toName);
-                }
-            }
-
-            getProcessor().process(exchange);
+        // list the files in the fold and poll the first file
+        List<ChannelSftp.LsEntry> list = operations.listFiles(fileName.substring(index + 1));
+        ChannelSftp.LsEntry file = list.get(0);
+        if (file != null) {
+            RemoteFile remoteFile = asRemoteFile(file);
+            fileList.add(remoteFile);
         }
     }
 
-    private boolean deleteFile(String filename) {
-        try {
-            channel.rm(filename);
-            return true;
-        } catch (SftpException e) {
-            // ignore just log a warning
-            log.warn("Could not delete file: " + filename + " from: " + remoteServer());
-            return false;
-        }
-    }
+    private RemoteFile<ChannelSftp.LsEntry> asRemoteFile(ChannelSftp.LsEntry file) {
+        RemoteFile<ChannelSftp.LsEntry> remote = new RemoteFile<ChannelSftp.LsEntry>();
+        remote.setFile(file);
+        remote.setFileName(file.getFilename());
+        remote.setFileLength(file.getAttrs().getSize());
+        remote.setLastModified(file.getAttrs().getMTime() * 1000L);
+        remote.setHostname(endpoint.getConfiguration().getHost());
+        String absoluteFileName = getAbsoluteFileName(file);
+        remote.setAbsolutelFileName(absoluteFileName);
 
-    protected void acquireExclusiveReadLock(ChannelSftp.LsEntry sftpFile) throws SftpException {
-        if (log.isTraceEnabled()) {
-            log.trace("Waiting for exclusive read lock to file: " + sftpFile);
+        // the relative filename
+        String ftpBasePath = endpoint.getConfiguration().getFile();
+        String relativePath = absoluteFileName.substring(ftpBasePath.length() + 1);
+        if (relativePath.startsWith("/")) {
+            relativePath = relativePath.substring(1);
         }
+        remote.setRelativeFileName(relativePath);
 
-        // the trick is to try to rename the file, if we can rename then we have exclusive read
-        // since its a remote file we can not use java.nio to get a RW access
-        String originalName = sftpFile.getFilename();
-        String newName = originalName + ".camelExclusiveReadLock";
-        boolean exclusive = false;
-        while (!exclusive) {
-            try {
-                channel.rename(originalName, newName);
-                exclusive = true;
-            } catch (SftpException e) {
-                // ignore we can not rename it
-            }
-
-            if (exclusive) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Acquired exclusive read lock to file: " + originalName);
-                }
-                // rename it back so we can read it
-                channel.rename(newName, originalName);
-            } else {
-                log.trace("Exclusive read lock not granted. Sleeping for 1000 millis");
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    // ignore
-                }
-            }
-        }
+        return remote;
     }
 
-    protected String getFileName(Object file) {
-        ChannelSftp.LsEntry sftpFile = (ChannelSftp.LsEntry) file;
-        return sftpFile.getFilename();
+    private String getAbsoluteFileName(ChannelSftp.LsEntry ftpFile) {
+        return operations.getCurrentDirectory() + "/" + ftpFile.getFilename();
     }
 
 }

Added: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpRemoteFileOperations.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpRemoteFileOperations.java?rev=729480&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpRemoteFileOperations.java (added)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpRemoteFileOperations.java Fri Dec 26 02:53:10 2008
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Vector;
+
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import com.jcraft.jsch.SftpException;
+import com.jcraft.jsch.UserInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import static org.apache.camel.util.ObjectHelper.isNotEmpty;
+
+/**
+ * SFTP remote file operations
+ */
+public class SftpRemoteFileOperations implements RemoteFileOperations<ChannelSftp> {
+    private static final Log LOG = LogFactory.getLog(SftpRemoteFileOperations.class);
+    private ChannelSftp channel;
+    private Session session;
+
+    public boolean connect(RemoteFileConfiguration configuration) throws RemoteFileOperationFailedException {
+        try {
+            if (isConnected()) {
+                // already connected
+                return true;
+            }
+            if (channel == null || !channel.isConnected()) {
+                if (session == null || !session.isConnected()) {
+                    LOG.trace("Session isn't connected, trying to recreate and connect.");
+                    session = createSession(configuration);
+                    session.connect();
+                }
+                LOG.trace("Channel isn't connected, trying to recreate and connect.");
+                channel = (ChannelSftp) session.openChannel("sftp");
+                channel.connect();
+                LOG.info("Connected to " + configuration.remoteServerInformation());
+            }
+
+            return true;
+
+        } catch (JSchException e) {
+            throw new RemoteFileOperationFailedException("Cannot connect to " + configuration.remoteServerInformation(), e);
+        }
+    }
+
+    protected Session createSession(final RemoteFileConfiguration configuration) throws JSchException {
+        final JSch jsch = new JSch();
+
+        if (isNotEmpty(configuration.getPrivateKeyFile())) {
+            LOG.debug("Using private keyfile: " + configuration.getPrivateKeyFile());
+            if (isNotEmpty(configuration.getPrivateKeyFilePassphrase())) {
+                jsch.addIdentity(configuration.getPrivateKeyFile(), configuration.getPrivateKeyFilePassphrase());
+            } else {
+                jsch.addIdentity(configuration.getPrivateKeyFile());
+            }
+        }
+
+        if (isNotEmpty(configuration.getKnownHostsFile())) {
+            LOG.debug("Using knownhosts file: " + configuration.getKnownHostsFile());
+            jsch.setKnownHosts(configuration.getKnownHostsFile());
+        }
+
+        final Session session = jsch.getSession(configuration.getUsername(), configuration.getHost(), configuration.getPort());
+        session.setUserInfo(new UserInfo() {
+            public String getPassphrase() {
+                return null;
+            }
+
+            public String getPassword() {
+                return configuration.getPassword();
+            }
+
+            public boolean promptPassword(String s) {
+                return true;
+            }
+
+            public boolean promptPassphrase(String s) {
+                return true;
+            }
+
+            public boolean promptYesNo(String s) {
+                LOG.error(s);
+                // Return 'false' indicating modification of the hosts file is disabled.
+                return false;
+            }
+
+            public void showMessage(String s) {
+            }
+        });
+        return session;
+    }
+
+    public boolean isConnected() throws RemoteFileOperationFailedException {
+        return session != null && session.isConnected() && channel != null && channel.isConnected();
+    }
+
+    public void disconnect() throws RemoteFileOperationFailedException {
+        if (session != null && session.isConnected()) {
+            session.disconnect();
+        }
+        if (channel != null && channel.isConnected()) {
+            channel.disconnect();
+        }
+    }
+
+    public boolean deleteFile(String name) throws RemoteFileOperationFailedException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleteing file: " + name);
+        }
+        try {
+            channel.rm(name);
+            return true;
+        } catch (SftpException e) {
+            throw new RemoteFileOperationFailedException("Cannot delete file: " + name, e);
+        }
+    }
+
+    public boolean renameFile(String from, String to) throws RemoteFileOperationFailedException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Renaming file: " + from + " to: " + to);
+        }
+        try {
+            channel.rename(from, to);
+            return true;
+        } catch (SftpException e) {
+            throw new RemoteFileOperationFailedException("Cannot rename file from: " + from + " to: " + to, e);
+        }
+    }
+
+    public boolean buildDirectory(String dirName) throws RemoteFileOperationFailedException {
+        boolean success = false;
+
+        String originalDirectory = getCurrentDirectory();
+        try {
+            // maybe the full directory already exsits
+            try {
+                channel.cd(dirName);
+                success = true;
+            } catch (SftpException e) {
+                // ignore, we could not change directory so try to create it instead
+            }
+
+            if (!success) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Trying to build remote directory: " + dirName);
+                }
+
+                try {
+                    channel.mkdir(dirName);
+                    success = true;
+                } catch (SftpException e) {
+                    // we are here if the server side doesn't create intermediate folders
+                    // so create the folder one by one
+                    success = buildDirectoryChunks(dirName);
+                }
+            }
+        } catch (IOException e) {
+            throw new RemoteFileOperationFailedException("Cannot build directory " + dirName, e);
+        } catch (SftpException e) {
+            throw new RemoteFileOperationFailedException("Cannot build directory " + dirName, e);
+        } finally {
+            // change back to original directory
+            if (originalDirectory != null) {
+                changeCurrentDirectory(originalDirectory);
+            }
+        }
+
+        return success;
+    }
+
+    private boolean buildDirectoryChunks(String dirName) throws IOException, SftpException {
+        final StringBuilder sb = new StringBuilder(dirName.length());
+        final String[] dirs = dirName.split("\\/");
+
+        boolean success = false;
+        for (String dir : dirs) {
+            sb.append(dir).append('/');
+            String directory = sb.toString();
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Trying to build remote directory: " + directory);
+            }
+
+            try {
+                channel.mkdir(directory);
+                success = true;
+            } catch (SftpException e) {
+                // ignore keep trying to create the rest of the path
+            }
+        }
+
+        return success;
+    }
+
+    public boolean retrieveFile(String name, OutputStream out) throws RemoteFileOperationFailedException {
+        try {
+            channel.get(name, out);
+        } catch (SftpException e) {
+            throw new RemoteFileOperationFailedException("Cannot get file: " + name, e);
+        }
+        return true;
+    }
+
+    public boolean storeFile(String name, InputStream body) throws RemoteFileOperationFailedException {
+        try {
+            channel.put(body, name);
+        } catch (SftpException e) {
+            throw new RemoteFileOperationFailedException("Cannot put file: " + name, e);
+        }
+        return true;
+    }
+
+    public String getCurrentDirectory() throws RemoteFileOperationFailedException {
+        try {
+            return channel.pwd();
+        } catch (SftpException e) {
+            throw new RemoteFileOperationFailedException("Cannot get current directory", e);
+        }
+    }
+
+    public void changeCurrentDirectory(String path) throws RemoteFileOperationFailedException {
+        try {
+            channel.cd(path);
+        } catch (SftpException e) {
+            throw new RemoteFileOperationFailedException("Cannot change current directory to: " + path, e);
+        }
+    }
+
+    public List listFiles() throws RemoteFileOperationFailedException {
+        return listFiles(".");
+    }
+
+    public List listFiles(String path) throws RemoteFileOperationFailedException {
+        try {
+            final List list = new ArrayList();
+            Vector files = channel.ls(path);
+            for (Object file : files) {
+                list.add(file);
+            }
+            return list;
+        } catch (SftpException e) {
+            throw new RemoteFileOperationFailedException("Cannot list directory: " + path, e);
+        }
+    }
+
+}
\ No newline at end of file

Added: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/DefaultRemoteFileRenamer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/DefaultRemoteFileRenamer.java?rev=729480&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/DefaultRemoteFileRenamer.java (added)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/DefaultRemoteFileRenamer.java Fri Dec 26 02:53:10 2008
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote.strategy;
+
+import org.apache.camel.component.file.remote.RemoteFile;
+import org.apache.camel.component.file.remote.RemoteFileExchange;
+
+public class DefaultRemoteFileRenamer implements RemoteFileRenamer {
+
+    private String namePrefix;
+    private String namePostfix;
+
+    public DefaultRemoteFileRenamer() {
+    }
+
+    public DefaultRemoteFileRenamer(String namePrefix, String namePostfix) {
+        this.namePrefix = namePrefix;
+        this.namePostfix = namePostfix;
+    }
+
+    public RemoteFile renameFile(RemoteFileExchange exchange, RemoteFile file) {
+        String newName = renameFileName(file.getFileName());
+
+        // clone and change the name
+        RemoteFile result = file.clone();
+        result.changeFileName(newName);
+        return result;
+    }
+
+    public String getNamePostfix() {
+        return namePostfix;
+    }
+
+    /**
+     * Sets the name postfix appended to moved files. For example
+     * to rename all the files from * to *.done set this value to ".done"
+     */
+    public void setNamePostfix(String namePostfix) {
+        this.namePostfix = namePostfix;
+    }
+
+    public String getNamePrefix() {
+        return namePrefix;
+    }
+
+    /**
+     * Sets the name prefix appended to moved files. For example
+     * to move processed files into a hidden directory called ".camel"
+     * set this value to ".camel/"
+     */
+    public void setNamePrefix(String namePrefix) {
+        this.namePrefix = namePrefix;
+    }
+
+    protected String renameFileName(String name) {
+        StringBuffer buffer = new StringBuffer();
+        if (namePrefix != null) {
+            buffer.append(namePrefix);
+        }
+        buffer.append(name);
+        if (namePostfix != null) {
+            buffer.append(namePostfix);
+        }
+        return buffer.toString();
+    }
+
+}

Added: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/DeleteRemoteFileProcessStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/DeleteRemoteFileProcessStrategy.java?rev=729480&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/DeleteRemoteFileProcessStrategy.java (added)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/DeleteRemoteFileProcessStrategy.java Fri Dec 26 02:53:10 2008
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote.strategy;
+
+import org.apache.camel.component.file.remote.RemoteFile;
+import org.apache.camel.component.file.remote.RemoteFileEndpoint;
+import org.apache.camel.component.file.remote.RemoteFileExchange;
+import org.apache.camel.component.file.remote.RemoteFileOperationFailedException;
+import org.apache.camel.component.file.remote.RemoteFileOperations;
+
+public class DeleteRemoteFileProcessStrategy extends RemoteFileProcessStrategySupport {
+
+    @Override
+    public void commit(RemoteFileOperations operations, RemoteFileEndpoint endpoint, RemoteFileExchange exchange, RemoteFile file) throws Exception {
+        boolean deleted = operations.deleteFile(file.getAbsolutelFileName());
+        if (!deleted) {
+            throw new RemoteFileOperationFailedException("Cannot delete file: " + file);
+        }
+    }
+
+}
\ No newline at end of file

Added: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/NoOpRemoteFileProcessStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/NoOpRemoteFileProcessStrategy.java?rev=729480&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/NoOpRemoteFileProcessStrategy.java (added)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/NoOpRemoteFileProcessStrategy.java Fri Dec 26 02:53:10 2008
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote.strategy;
+
+public class NoOpRemoteFileProcessStrategy extends RemoteFileProcessStrategySupport {
+
+}

Added: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/RemoteFileExpressionRenamer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/RemoteFileExpressionRenamer.java?rev=729480&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/RemoteFileExpressionRenamer.java (added)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/RemoteFileExpressionRenamer.java Fri Dec 26 02:53:10 2008
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote.strategy;
+
+import org.apache.camel.Expression;
+import org.apache.camel.component.file.remote.RemoteFile;
+import org.apache.camel.component.file.remote.RemoteFileExchange;
+import org.apache.camel.util.ObjectHelper;
+
+public class RemoteFileExpressionRenamer implements RemoteFileRenamer {
+    private Expression expression;
+
+    public RemoteFile renameFile(RemoteFileExchange exchange, RemoteFile file) {
+        ObjectHelper.notNull(expression, "expression");
+
+        Object eval = expression.evaluate(exchange);
+        String newName = exchange.getContext().getTypeConverter().convertTo(String.class, eval);
+
+        // clone and change the name
+        RemoteFile result = file.clone();
+        result.changeFileName(newName);
+        return result;
+    }
+
+    public Expression getExpression() {
+        return expression;
+    }
+
+    public void setExpression(Expression expression) {
+        this.expression = expression;
+    }    
+}



Mime
View raw message