camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dval...@apache.org
Subject svn commit: r1448198 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/main/java/org/apache/camel/component/file/strategy/ components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/ components/c...
Date Wed, 20 Feb 2013 14:47:06 GMT
Author: dvaleri
Date: Wed Feb 20 14:47:06 2013
New Revision: 1448198

URL: http://svn.apache.org/r1448198
Log:
[CAMEL-6090] Added streaming option for retreival of remote files.

Added:
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/CopyOfFtpSimpleConsumeStreamingWithMultipleFilesTest.java
      - copied, changed from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingPartialReadTest.java
  (with props)
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingTest.java
      - copied, changed from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingWithMultipleFilesTest.java
      - copied, changed from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingPartialReadTest.java
      - copied, changed from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingTest.java
      - copied, changed from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingWithMultipleFilesTest.java
      - copied, changed from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOperations.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileComponent.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
    camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java?rev=1448198&r1=1448197&r2=1448198&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileOperations.java
Wed Feb 20 14:47:06 2013
@@ -155,6 +155,11 @@ public class FileOperations implements G
         // noop as we use type converters to read the body content for java.io.File
         return true;
     }
+    
+    @Override
+    public void releaseRetreivedFileResources(Exchange exchange) throws GenericFileOperationFailedException
{
+        // noop as we used type converters to read the body content for java.io.File
+    }
 
     public boolean storeFile(String fileName, Exchange exchange) throws GenericFileOperationFailedException
{
         ObjectHelper.notNull(endpoint, "endpoint");

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOperations.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOperations.java?rev=1448198&r1=1448197&r2=1448198&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOperations.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOperations.java
Wed Feb 20 14:47:06 2013
@@ -77,6 +77,14 @@ public interface GenericFileOperations<T
      * @throws GenericFileOperationFailedException can be thrown
      */
     boolean retrieveFile(String name, Exchange exchange) throws GenericFileOperationFailedException;
+    
+    /**
+     * Releases the resources consumed by a retrieved file
+     * 
+     * @param exchange exchange with the content of the file
+     * @throws GenericFileOperationFailedException can be thrown
+     */
+    void releaseRetreivedFileResources(Exchange exchange) throws GenericFileOperationFailedException;
 
     /**
      * Stores the content as a new remote file (upload)

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java?rev=1448198&r1=1448197&r2=1448198&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/GenericFileProcessStrategySupport.java
Wed Feb 20 14:47:06 2013
@@ -62,6 +62,7 @@ public abstract class GenericFileProcess
         }
 
         deleteLocalWorkFile(exchange);
+        operations.releaseRetreivedFileResources(exchange);
     }
 
     public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T>
endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
@@ -70,6 +71,7 @@ public abstract class GenericFileProcess
         }
 
         deleteLocalWorkFile(exchange);
+        operations.releaseRetreivedFileResources(exchange);
     }
 
     public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T>
endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
@@ -78,6 +80,7 @@ public abstract class GenericFileProcess
         }
 
         deleteLocalWorkFile(exchange);
+        operations.releaseRetreivedFileResources(exchange);
     }
 
     public GenericFileExclusiveReadLockStrategy<T> getExclusiveReadLockStrategy() {

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java?rev=1448198&r1=1448197&r2=1448198&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
(original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
Wed Feb 20 14:47:06 2013
@@ -298,17 +298,29 @@ public class FtpOperations implements Re
             return retrieveFileToStreamInBody(name, exchange);
         }
     }
+    
+    @Override
+    public void releaseRetreivedFileResources(Exchange exchange) throws GenericFileOperationFailedException
{
+        InputStream is = exchange.getIn().getHeader(RemoteFileComponent.REMOTE_FILE_INPUT_STREAM,
InputStream.class);
+        
+        if (is != null) {
+            try {
+                is.close();
+                client.completePendingCommand();
+            } catch (IOException e) {
+                throw new GenericFileOperationFailedException(e.getMessage(), e);
+            }
+        }
+    }
 
     @SuppressWarnings("unchecked")
     private boolean retrieveFileToStreamInBody(String name, Exchange exchange) throws GenericFileOperationFailedException
{
         OutputStream os = null;
         boolean result;
         try {
-            os = new ByteArrayOutputStream();
             GenericFile<FTPFile> target = (GenericFile<FTPFile>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
             ObjectHelper.notNull(target, "Exchange should have the " + FileComponent.FILE_EXCHANGE_FILE
+ " set");
-            target.setBody(os);
-
+            
             String remoteName = name;
             String currentDir = null;
             if (endpoint.getConfiguration().isStepwise()) {
@@ -326,7 +338,17 @@ public class FtpOperations implements Re
             }
 
             log.trace("Client retrieveFile: {}", remoteName);
-            result = client.retrieveFile(remoteName, os);
+            
+            if (endpoint.getConfiguration().isStreamDownload()) {
+                InputStream is = client.retrieveFileStream(remoteName); 
+                target.setBody(is);
+                exchange.getIn().setHeader(RemoteFileComponent.REMOTE_FILE_INPUT_STREAM,
is);
+                result = true;
+            } else {
+                os = new ByteArrayOutputStream();
+                target.setBody(os);
+                result = client.retrieveFile(remoteName, os);
+            }
 
             // change back to current directory
             if (endpoint.getConfiguration().isStepwise()) {

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileComponent.java?rev=1448198&r1=1448197&r2=1448198&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileComponent.java
(original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileComponent.java
Wed Feb 20 14:47:06 2013
@@ -26,6 +26,8 @@ import org.apache.camel.component.file.G
  * @param <T> the type of file that these remote endpoints provide
  */
 public abstract class RemoteFileComponent<T> extends GenericFileComponent<T>
{
+    
+    public static final String REMOTE_FILE_INPUT_STREAM = "CamelRemoteFileInputStream";
 
     public RemoteFileComponent() {
     }

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java?rev=1448198&r1=1448197&r2=1448198&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
(original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConfiguration.java
Wed Feb 20 14:47:06 2013
@@ -49,6 +49,7 @@ public abstract class RemoteFileConfigur
     private String siteCommand;
     private boolean stepwise = true;
     private PathSeparator separator = PathSeparator.Auto;
+    private boolean streamDownload;
 
     public RemoteFileConfiguration() {
     }
@@ -260,6 +261,21 @@ public abstract class RemoteFileConfigur
     public void setSeparator(PathSeparator separator) {
         this.separator = separator;
     }
+    
+    public boolean isStreamDownload() {
+        return streamDownload;
+    }
+
+    /**
+     * Sets the download method to use when not using a local working directory.  If set
to true,
+     * the remote files are streamed to the route as they are read.  When set to false, the
remote files
+     * are loaded into memory before being sent into the route.
+     *
+     * @param streamDownload 
+     */
+    public void setStreamDownload(boolean streamDownload) {
+        this.streamDownload = streamDownload;
+    }
 
     /**
      * Normalizes the given path according to the configured path separator.

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java?rev=1448198&r1=1448197&r2=1448198&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
(original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java
Wed Feb 20 14:47:06 2013
@@ -489,18 +489,29 @@ public class SftpOperations implements R
             return retrieveFileToStreamInBody(name, exchange);
         }
     }
+    
+    @Override
+    public void releaseRetreivedFileResources(Exchange exchange) throws GenericFileOperationFailedException
{
+        InputStream is = exchange.getIn().getHeader(RemoteFileComponent.REMOTE_FILE_INPUT_STREAM,
InputStream.class);
+        
+        if (is != null) {
+            try {
+                is.close();
+            } catch (IOException e) {
+                throw new GenericFileOperationFailedException(e.getMessage(), e);
+            }
+        }
+    }
 
     @SuppressWarnings("unchecked")
     private boolean retrieveFileToStreamInBody(String name, Exchange exchange) throws GenericFileOperationFailedException
{
         OutputStream os = null;
         String currentDir = null;
         try {
-            os = new ByteArrayOutputStream();
             GenericFile<ChannelSftp.LsEntry> target =
                     (GenericFile<ChannelSftp.LsEntry>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
             ObjectHelper.notNull(target, "Exchange should have the " + FileComponent.FILE_EXCHANGE_FILE
+ " set");
-            target.setBody(os);
-
+            
             String remoteName = name;
             if (endpoint.getConfiguration().isStepwise()) {
                 // remember current directory
@@ -518,7 +529,15 @@ public class SftpOperations implements R
 
             // use input stream which works with Apache SSHD used for testing
             InputStream is = channel.get(remoteName);
-            IOHelper.copyAndCloseInput(is, os);
+            
+            if (endpoint.getConfiguration().isStreamDownload()) {
+                target.setBody(is);
+                exchange.getIn().setHeader(RemoteFileComponent.REMOTE_FILE_INPUT_STREAM,
is);
+            } else {
+                os = new ByteArrayOutputStream();
+                target.setBody(os);
+                IOHelper.copyAndCloseInput(is, os);
+            }
 
             return true;
         } catch (IOException e) {

Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/CopyOfFtpSimpleConsumeStreamingWithMultipleFilesTest.java
(from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/CopyOfFtpSimpleConsumeStreamingWithMultipleFilesTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/CopyOfFtpSimpleConsumeStreamingWithMultipleFilesTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java&r1=1447882&r2=1448198&rev=1448198&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
(original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/CopyOfFtpSimpleConsumeStreamingWithMultipleFilesTest.java
Wed Feb 20 14:47:06 2013
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.file.remote.sftp;
+package org.apache.camel.component.file.remote;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
@@ -24,10 +24,10 @@ import org.junit.Test;
 /**
  * @version 
  */
-public class SftpSimpleConsumeTest extends SftpServerTestSupport {
+public class CopyOfFtpSimpleConsumeStreamingWithMultipleFilesTest extends FtpServerTestSupport
{
 
     @Test
-    public void testSftpSimpleConsume() throws Exception {
+    public void testFtpSimpleConsumeAbsolute() throws Exception {
         if (!canTest()) {
             return;
         }
@@ -35,7 +35,10 @@ public class SftpSimpleConsumeTest exten
         String expected = "Hello World";
 
         // create file using regular file
-        template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME,
"hello.txt");
+
+        // FTP Server does not support absolute path, so lets simulate it
+        String path = FTP_ROOT_DIR + "/tmp/mytemp";
+        template.sendBodyAndHeader("file:" + path, expected, Exchange.FILE_NAME, "hello.txt");
 
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
@@ -51,7 +54,9 @@ public class SftpSimpleConsumeTest exten
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true")
+                // notice we use an absolute starting path: /tmp/mytemp
+                // - we must remember to use // slash because of the url separator
+                from("ftp://localhost:" + getPort() + "//tmp/mytemp?username=admin&password=admin&delay=10s&disconnect=true&streamDownload=true")
                     .routeId("foo").noAutoStartup()
                     .to("mock:result");
             }

Added: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingPartialReadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingPartialReadTest.java?rev=1448198&view=auto
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingPartialReadTest.java
(added)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingPartialReadTest.java
Wed Feb 20 14:47:06 2013
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+import java.io.InputStream;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+/**
+ * @version 
+ */
+public class FtpSimpleConsumeStreamingPartialReadTest extends FtpServerTestSupport {
+
+    @Test
+    public void testFtpSimpleConsumeAbsolute() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        String expected = "Hello World";
+
+        // create file using regular file
+
+        // FTP Server does not support absolute path, so lets simulate it
+        String path = FTP_ROOT_DIR + "/tmp/mytemp";
+        template.sendBodyAndHeader("file:" + path, expected, Exchange.FILE_NAME, "hello.txt");
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt");
+
+        context.startRoute("foo");
+
+        assertMockEndpointsSatisfied();
+        
+        // Wait a little bit for the move to finish.
+        Thread.sleep(2000);
+        
+        File resultFile = new File(path + File.separator + "failed", "hello.txt");
+        assertTrue(resultFile.exists());
+        assertFalse(resultFile.isDirectory());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // notice we use an absolute starting path: /tmp/mytemp
+                // - we must remember to use // slash because of the url separator
+                from("ftp://localhost:" + getPort()
+                         + "//tmp/mytemp?username=admin&password=admin&delay=10s&disconnect=true&streamDownload=true"
+                         + "&move=done&moveFailed=failed")
+                    .routeId("foo").noAutoStartup()
+                    .process(new Processor() {
+                        
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            exchange.getIn().getBody(InputStream.class).read();
+                            
+                        }
+                    })
+                    .to("mock:result")
+                    .process(new Processor() {
+                        
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            throw new Exception("INTENTIONAL ERROR");
+                        }
+                    });
+            }
+        };
+    }
+}

Propchange: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingPartialReadTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingTest.java
(from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java&r1=1447882&r2=1448198&rev=1448198&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
(original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingTest.java
Wed Feb 20 14:47:06 2013
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.file.remote.sftp;
+package org.apache.camel.component.file.remote;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
@@ -24,10 +24,10 @@ import org.junit.Test;
 /**
  * @version 
  */
-public class SftpSimpleConsumeTest extends SftpServerTestSupport {
+public class FtpSimpleConsumeStreamingTest extends FtpServerTestSupport {
 
     @Test
-    public void testSftpSimpleConsume() throws Exception {
+    public void testFtpSimpleConsumeAbsolute() throws Exception {
         if (!canTest()) {
             return;
         }
@@ -35,7 +35,10 @@ public class SftpSimpleConsumeTest exten
         String expected = "Hello World";
 
         // create file using regular file
-        template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME,
"hello.txt");
+
+        // FTP Server does not support absolute path, so lets simulate it
+        String path = FTP_ROOT_DIR + "/tmp/mytemp";
+        template.sendBodyAndHeader("file:" + path, expected, Exchange.FILE_NAME, "hello.txt");
 
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
@@ -51,7 +54,9 @@ public class SftpSimpleConsumeTest exten
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true")
+                // notice we use an absolute starting path: /tmp/mytemp
+                // - we must remember to use // slash because of the url separator
+                from("ftp://localhost:" + getPort() + "//tmp/mytemp?username=admin&password=admin&delay=10s&disconnect=true&streamDownload=true")
                     .routeId("foo").noAutoStartup()
                     .to("mock:result");
             }

Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingWithMultipleFilesTest.java
(from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingWithMultipleFilesTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingWithMultipleFilesTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java&r1=1447882&r2=1448198&rev=1448198&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
(original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpSimpleConsumeStreamingWithMultipleFilesTest.java
Wed Feb 20 14:47:06 2013
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.file.remote.sftp;
+package org.apache.camel.component.file.remote;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
@@ -24,22 +24,27 @@ import org.junit.Test;
 /**
  * @version 
  */
-public class SftpSimpleConsumeTest extends SftpServerTestSupport {
+public class FtpSimpleConsumeStreamingWithMultipleFilesTest extends FtpServerTestSupport
{
 
     @Test
-    public void testSftpSimpleConsume() throws Exception {
+    public void testFtpSimpleConsumeAbsolute() throws Exception {
         if (!canTest()) {
             return;
         }
 
         String expected = "Hello World";
+        String expected2 = "Goodbye World";
 
         // create file using regular file
-        template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME,
"hello.txt");
+
+        // FTP Server does not support absolute path, so lets simulate it
+        String path = FTP_ROOT_DIR + "/tmp/mytemp";
+        template.sendBodyAndHeader("file:" + path, expected, Exchange.FILE_NAME, "hello.txt");
+        template.sendBodyAndHeader("file:" + path, expected2, Exchange.FILE_NAME, "goodbye.txt");
 
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(1);
-        mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt");
+        mock.expectedMessageCount(2);
+        mock.expectedBodiesReceivedInAnyOrder(expected, expected2);
 
         context.startRoute("foo");
 
@@ -51,7 +56,9 @@ public class SftpSimpleConsumeTest exten
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true")
+                // notice we use an absolute starting path: /tmp/mytemp
+                // - we must remember to use // slash because of the url separator
+                from("ftp://localhost:" + getPort() + "//tmp/mytemp?username=admin&password=admin&delay=10s&disconnect=true&streamDownload=true")
                     .routeId("foo").noAutoStartup()
                     .to("mock:result");
             }

Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingPartialReadTest.java
(from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingPartialReadTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingPartialReadTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java&r1=1447882&r2=1448198&rev=1448198&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
(original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingPartialReadTest.java
Wed Feb 20 14:47:06 2013
@@ -16,15 +16,21 @@
  */
 package org.apache.camel.component.file.remote.sftp;
 
+import java.io.File;
+import java.io.InputStream;
+
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
 
 /**
+ * Tests that a file move can occur on the server even if the remote stream was only partially
read.
+ *
  * @version 
  */
-public class SftpSimpleConsumeTest extends SftpServerTestSupport {
+public class SftpSimpleConsumeStreamingPartialReadTest extends SftpServerTestSupport {
 
     @Test
     public void testSftpSimpleConsume() throws Exception {
@@ -40,10 +46,17 @@ public class SftpSimpleConsumeTest exten
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
         mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt");
-
+        
         context.startRoute("foo");
 
         assertMockEndpointsSatisfied();
+        
+        // Wait a little bit for the move to finish.
+        Thread.sleep(2000);
+        
+        File resultFile = new File(FTP_ROOT_DIR + File.separator + "failed", "hello.txt");
+        assertTrue(resultFile.exists());
+        assertFalse(resultFile.isDirectory());
     }
 
     @Override
@@ -51,9 +64,26 @@ public class SftpSimpleConsumeTest exten
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true")
+                from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR
+                         + "?username=admin&password=admin&delay=10s&disconnect=true&streamDownload=true"
+                         + "&move=done&moveFailed=failed")
                     .routeId("foo").noAutoStartup()
-                    .to("mock:result");
+                    .process(new Processor() {
+                        
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            exchange.getIn().getBody(InputStream.class).read();
+                            
+                        }
+                    })
+                    .to("mock:result")
+                    .process(new Processor() {
+                        
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            throw new Exception("INTENTIONAL ERROR");
+                        }
+                    });
             }
         };
     }

Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingTest.java
(from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java&r1=1447882&r2=1448198&rev=1448198&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
(original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingTest.java
Wed Feb 20 14:47:06 2013
@@ -24,7 +24,7 @@ import org.junit.Test;
 /**
  * @version 
  */
-public class SftpSimpleConsumeTest extends SftpServerTestSupport {
+public class SftpSimpleConsumeStreamingTest extends SftpServerTestSupport {
 
     @Test
     public void testSftpSimpleConsume() throws Exception {
@@ -40,7 +40,8 @@ public class SftpSimpleConsumeTest exten
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
         mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt");
-
+        mock.expectedBodiesReceived(expected);
+        
         context.startRoute("foo");
 
         assertMockEndpointsSatisfied();
@@ -51,7 +52,7 @@ public class SftpSimpleConsumeTest exten
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true")
+                from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true&streamDownload=true")
                     .routeId("foo").noAutoStartup()
                     .to("mock:result");
             }

Copied: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingWithMultipleFilesTest.java
(from r1447882, camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingWithMultipleFilesTest.java?p2=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingWithMultipleFilesTest.java&p1=camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java&r1=1447882&r2=1448198&rev=1448198&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
(original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeStreamingWithMultipleFilesTest.java
Wed Feb 20 14:47:06 2013
@@ -24,7 +24,7 @@ import org.junit.Test;
 /**
  * @version 
  */
-public class SftpSimpleConsumeTest extends SftpServerTestSupport {
+public class SftpSimpleConsumeStreamingWithMultipleFilesTest extends SftpServerTestSupport
{
 
     @Test
     public void testSftpSimpleConsume() throws Exception {
@@ -33,14 +33,16 @@ public class SftpSimpleConsumeTest exten
         }
 
         String expected = "Hello World";
+        String expected2 = "Goodbye World";
 
         // create file using regular file
         template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME,
"hello.txt");
+        template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected2, Exchange.FILE_NAME,
"goodbye.txt");
 
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(1);
-        mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt");
-
+        mock.expectedMessageCount(2);
+        mock.expectedBodiesReceivedInAnyOrder(expected, expected2);
+        
         context.startRoute("foo");
 
         assertMockEndpointsSatisfied();
@@ -51,7 +53,7 @@ public class SftpSimpleConsumeTest exten
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true")
+                from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true&streamDownload=true")
                     .routeId("foo").noAutoStartup()
                     .to("mock:result");
             }

Modified: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java?rev=1448198&r1=1448197&r2=1448198&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
(original)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpSimpleConsumeTest.java
Wed Feb 20 14:47:06 2013
@@ -40,7 +40,8 @@ public class SftpSimpleConsumeTest exten
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
         mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt");
-
+        mock.expectedBodiesReceived(expected);
+        
         context.startRoute("foo");
 
         assertMockEndpointsSatisfied();



Mime
View raw message