camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r679990 - in /activemq/camel/trunk/components/camel-ftp/src: main/java/org/apache/camel/component/file/remote/ test/java/org/apache/camel/component/file/remote/ test/resources/
Date Sat, 26 Jul 2008 13:37:47 GMT
Author: davsclaus
Date: Sat Jul 26 06:37:46 2008
New Revision: 679990

URL: http://svn.apache.org/viewvc?rev=679990&view=rev
Log:
CAMEL-654: Introduced exclusiveRead option to camel-ftp. This option is default and fixes
the problem that the ftp producer can poll files that is in progress of being written. With
this option the ftp producer will wait until the file is finished written and that it can
poll it safely. Beware the option is default.

Added:
    activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
Modified:
    activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
    activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
    activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
    activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=679990&r1=679989&r2=679990&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
(original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
Sat Jul 26 06:37:46 2008
@@ -39,6 +39,7 @@
     private boolean recursive = true;
     private String regexPattern;
     private boolean setNames = true;
+    private boolean exclusiveRead = true;
 
     public FtpConsumer(FtpEndpoint endpoint, Processor processor, FTPClient client) {
         super(endpoint, processor);
@@ -46,27 +47,30 @@
         this.client = client;
     }
 
-    public FtpConsumer(FtpEndpoint endpoint, Processor processor, FTPClient client, ScheduledExecutorService
executor) {
+    public FtpConsumer(FtpEndpoint endpoint, Processor processor, FTPClient client,
+                       ScheduledExecutorService executor) {
         super(endpoint, processor, executor);
         this.endpoint = endpoint;
         this.client = client;
     }
 
     protected void connectIfNecessary() throws IOException {
-        // TODO: is there a way to avoid copy-pasting the reconnect logic?
         if (!client.isConnected()) {
-            LOG.warn("FtpConsumer's client isn't connected, trying to reconnect...");
+            LOG.debug("Not connected, trying to reconnect.");
             endpoint.connect(client);
-            LOG.info("Connected to " + endpoint.getConfiguration());
+            LOG.info("Connected to " + endpoint.getConfiguration().remoteServerInformation());
         }
     }
 
     protected void disconnect() throws IOException {
-        LOG.info("FtpConsumer's client is being explicitly disconnected");
+        LOG.debug("Disconnecting from " + endpoint.getConfiguration().remoteServerInformation());
         endpoint.disconnect(client);
     }
 
     protected void poll() throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Polling " + endpoint.getConfiguration());
+        }
         connectIfNecessary();
         // If the attempt to connect isn't successful, then the thrown
         // exception will signify that we couldn't poll
@@ -86,12 +90,12 @@
         } catch (FTPConnectionClosedException e) {
             // If the server disconnected us, then we must manually disconnect
             // the client before attempting to reconnect
-            LOG.warn("Disconnecting due to exception: " + e.toString());
+            LOG.warn("Disconnecting due to exception: " + e.getMessage());
             disconnect();
             // Rethrow to signify that we didn't poll
             throw e;
         } catch (RuntimeCamelException e) {
-            LOG.warn("Caught RuntimeCamelException: " + e.toString());
+            LOG.warn("Caught RuntimeCamelException: " + e.getMessage(), e);
             LOG.warn("Hoping an explicit disconnect/reconnect will solve the problem");
             disconnect();
             // Rethrow to signify that we didn't poll
@@ -111,8 +115,7 @@
                     pollDirectory(getFullFileName(ftpFile));
                 }
             } else {
-                // TODO: Type can be symbolic link etc. so what should we do?
-                LOG.warn("Unsupported type of FTPFile: " + ftpFile + " not a file or directory");
+                LOG.debug("Unsupported type of FTPFile: " + ftpFile + " (not a file or directory).
Is skipped.");
             }
         }
 
@@ -125,29 +128,71 @@
     }
 
     private void pollFile(FTPFile ftpFile) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Polling file: " + ftpFile);
+        }
+
+        long ts = ftpFile.getTimestamp().getTimeInMillis();
         // TODO do we need to adjust the TZ? can we?
-        if (ftpFile.getTimestamp().getTimeInMillis() > lastPollTime) {
-            if (isMatched(ftpFile)) {
-                String fullFileName = getFullFileName(ftpFile);
-                final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-                client.retrieveFile(ftpFile.getName(), byteArrayOutputStream);
-                RemoteFileExchange exchange = endpoint.createExchange(fullFileName, byteArrayOutputStream);
-
-                if (isSetNames()) {
-                    // set the filename in the special header filename marker to the ftp
filename
-                    String ftpBasePath = endpoint.getConfiguration().getFile();
-                    String relativePath = fullFileName.substring(ftpBasePath.length() + 1);
-                    relativePath = relativePath.replaceFirst("/", "");
-
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Setting exchange filename to " + relativePath);
-                    }
-                    exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME, relativePath);
+        if (ts > lastPollTime && isMatched(ftpFile)) {
+            String remoteServer =  endpoint.getConfiguration().remoteServerInformation();
+            String fullFileName = getFullFileName(ftpFile);
+
+            // is we use excluse read then acquire the exclusive read (waiting until we got
it)
+            if (exclusiveRead) {
+                acquireExclusiveRead(client, ftpFile);
+            }
+
+            // retrieve the file
+            final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+            client.retrieveFile(ftpFile.getName(), byteArrayOutputStream);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Retrieved file: " + ftpFile.getName() + " from: " + remoteServer);
+            }
+
+            RemoteFileExchange exchange = endpoint.createExchange(fullFileName, byteArrayOutputStream);
+
+            if (isSetNames()) {
+                // set the filename in the special header filename marker to the ftp filename
+                String ftpBasePath = endpoint.getConfiguration().getFile();
+                String relativePath = fullFileName.substring(ftpBasePath.length() + 1);
+                relativePath = relativePath.replaceFirst("/", "");
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Setting exchange filename to " + relativePath);
                 }
+                exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME, relativePath);
+            }
 
-                getProcessor().process(exchange);
+            getProcessor().process(exchange);
+        }
+    }
+
+    protected void acquireExclusiveRead(FTPClient client, FTPFile ftpFile) throws IOException
{
+        LOG.trace("Acquiring exclusive read (avoid reading file that is in progress of being
written)");
+
+        // the trick is to try to rename the file, if we can rename then we have exclusive
read
+        // since its a remote file we can not use java.nio to get a RW access
+        String originalName = ftpFile.getName();
+        String newName = originalName + ".camel";
+        boolean exclusive = false;
+        while (! exclusive) {
+            exclusive = client.rename(originalName, newName);
+            if (exclusive) {
+                // rename it back so we can read it
+                client.rename(newName, originalName);
+            } else {
+                LOG.trace("Exclusive read not granted. Sleeping for 1000 millis");
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
             }
         }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Acquired exclusive read to: " + originalName);
+        }
     }
 
     protected boolean isMatched(FTPFile file) {
@@ -189,4 +234,12 @@
     public void setSetNames(boolean setNames) {
         this.setNames = setNames;
     }
+
+    public boolean isExclusiveRead() {
+        return exclusiveRead;
+    }
+
+    public void setExclusiveRead(boolean exclusiveRead) {
+        this.exclusiveRead = exclusiveRead;
+    }
 }

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=679990&r1=679989&r2=679990&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
(original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
Sat Jul 26 06:37:46 2008
@@ -30,7 +30,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-
 public class SftpConsumer extends RemoteFileConsumer<RemoteFileExchange> {
     private static final transient Log LOG = LogFactory.getLog(SftpConsumer.class);
     private final SftpEndpoint endpoint;
@@ -41,6 +40,7 @@
     private ChannelSftp channel;
     private Session session;
     private boolean setNames;
+    private boolean exclusiveRead = true;
 
     public SftpConsumer(SftpEndpoint endpoint, Processor processor, Session session) {
         super(endpoint, processor);
@@ -57,30 +57,32 @@
     protected void connectIfNecessary() throws JSchException {
         if (channel == null || !channel.isConnected()) {
             if (session == null || !session.isConnected()) {
-                LOG.info("Session isn't connected, trying to recreate and connect...");
+                LOG.debug("Session isn't connected, trying to recreate and connect.");
                 session = endpoint.createSession();
                 session.connect();
             }
-            LOG.info("Channel isn't connected, trying to recreate and connect...");
+            LOG.debug("Channel isn't connected, trying to recreate and connect.");
             channel = endpoint.createChannelSftp(session);
             channel.connect();
-            LOG.info("Connected to " + endpoint.getConfiguration().toString());
+            LOG.info("Connected to " + endpoint.getConfiguration().remoteServerInformation());
         }
     }
 
     protected void disconnect() throws JSchException {
         if (session != null) {
-            LOG.info("Session is being explicitly disconnected");
+            LOG.debug("Session is being explicitly disconnected");
             session.disconnect();
         }
         if (channel != null) {
-            LOG.info("Channel is being explicitly disconnected");
+            LOG.debug("Channel is being explicitly disconnected");
             channel.disconnect();
         }
     }
 
     protected void poll() throws Exception {
-        // TODO: is there a way to avoid copy-pasting the reconnect logic?
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Polling " + endpoint.getConfiguration());
+        }
         connectIfNecessary();
         // If the attempt to connect isn't successful, then the thrown
         // exception will signify that we couldn't poll
@@ -97,15 +99,15 @@
         } catch (JSchException e) {
             // If the connection has gone stale, then we must manually disconnect
             // the client before attempting to reconnect
-            LOG.warn("Disconnecting due to exception: " + e.toString());
+            LOG.warn("Disconnecting due to exception: " + e.getMessage());
             disconnect();
             // Rethrow to signify that we didn't poll
             throw e;
         } catch (SftpException e) {
             // Still not sure if/when these come up and what we should do about them
             // client.disconnect();
-            LOG.warn("Caught SftpException:" + e.toString());
-            LOG.warn("Doing nothing for now, need to determine an appropriate action");
+            LOG.warn("Caught SftpException:" + e.getMessage(), e);
+            LOG.warn("Hoping an explicit disconnect/reconnect will solve the problem");
             // Rethrow to signify that we didn't poll
             throw e;
         }
@@ -136,23 +138,73 @@
     }
 
     private void pollFile(ChannelSftp.LsEntry sftpFile) throws Exception {
-        if (sftpFile.getAttrs().getMTime() * 1000L > lastPollTime) {
-            if (isMatched(sftpFile)) {
-                final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-                channel.get(sftpFile.getFilename(), byteArrayOutputStream);
-                RemoteFileExchange exchange = endpoint.createExchange(getFullFileName(sftpFile),
byteArrayOutputStream);
-
-                if (isSetNames()) {
-                    String relativePath = getFullFileName(sftpFile).substring(endpoint.getConfiguration().getFile().length());
-                    if (relativePath.startsWith("/")) {
-                        relativePath = relativePath.substring(1);
-                    }
-                    exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME, relativePath);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Polling file: " + sftpFile);
+        }
+
+        long ts = sftpFile.getAttrs().getMTime() * 1000L;
+
+        // TODO do we need to adjust the TZ? can we?
+        if (ts > lastPollTime && isMatched(sftpFile)) {
+            String remoteServer =  endpoint.getConfiguration().remoteServerInformation();
+
+            // is we use excluse read then acquire the exclusive read (waiting until we got
it)
+            if (exclusiveRead) {
+                acquireExclusiveRead(sftpFile);
+            }
+
+            // retrieve the file
+            final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+            channel.get(sftpFile.getFilename(), byteArrayOutputStream);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Retrieved file: " + sftpFile.getFilename() + " from: " + remoteServer);
+            }
+
+            RemoteFileExchange exchange = endpoint.createExchange(getFullFileName(sftpFile),
byteArrayOutputStream);
+
+            if (isSetNames()) {
+                String relativePath = getFullFileName(sftpFile).substring(endpoint.getConfiguration().getFile().length());
+                if (relativePath.startsWith("/")) {
+                    relativePath = relativePath.substring(1);
                 }
+                exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME, relativePath);
+            }
 
-                getProcessor().process(exchange);
+            getProcessor().process(exchange);
+        }
+    }
+
+    protected void acquireExclusiveRead(ChannelSftp.LsEntry sftpFile) throws SftpException
{
+        LOG.trace("Acquiring exclusive read (avoid reading file that is in progress of being
written)");
+
+        // the trick is to try to rename the file, if we can rename then we have exclusive
read
+        // since its a remote file we can not use java.nio to get a RW access
+        String originalName = sftpFile.getFilename();
+        String newName = originalName + ".camel";
+        boolean exclusive = false;
+        while (! exclusive) {
+            try {
+                channel.rename(originalName, newName);
+                exclusive = true;
+            } catch (SftpException e) {
+                // ignore we can not rename it
+            }
+
+            if (exclusive) {
+                // rename it back so we can read it
+                channel.rename(newName, originalName);
+            } else {
+                LOG.trace("Exclusive read not granted. Sleeping for 1000 millis");
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
             }
         }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Acquired exclusive read to: " + originalName);
+        }
     }
 
     protected boolean isMatched(ChannelSftp.LsEntry sftpFile) {
@@ -194,4 +246,12 @@
     public void setSetNames(boolean setNames) {
         this.setNames = setNames;
     }
+
+    public boolean isExclusiveRead() {
+        return exclusiveRead;
+    }
+
+    public void setExclusiveRead(boolean exclusiveRead) {
+        this.exclusiveRead = exclusiveRead;
+    }
 }

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java?rev=679990&r1=679989&r2=679990&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
(original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
Sat Jul 26 06:37:46 2008
@@ -46,7 +46,6 @@
     protected Session createSession() throws JSchException {
         final JSch jsch = new JSch();
         final Session session = jsch.getSession(getConfiguration().getUsername(), getConfiguration().getHost());
-        // TODO there's got to be a better way to deal with accepting new hosts...
         session.setUserInfo(new UserInfo() {
             public String getPassphrase() {
                 return null;

Added: activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java?rev=679990&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
(added)
+++ activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
Sat Jul 26 06:37:46 2008
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+import java.io.FileOutputStream;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Unit test to verify exclusive read - that we do not poll files that is in progress of
being written.
+ */
+public class FromFtpExclusiveReadTest extends FtpServerTestSupport {
+
+    private static final Log LOG = LogFactory.getLog(FromFtpExclusiveReadTest.class);
+
+    private String ftpUrl = "ftp://admin@localhost:" + getPort() + "/slowfile?password=admin&binary=false&consumer.exclusiveRead=true&consumer.delay=500";
+
+    public String getPort() {
+        return "20019";
+    }
+
+    public void testPoolIn3SecondsButNoFiles() throws Exception {
+        deleteDirectory("./res/home");
+        createDirectory("./res/home/slowfile");
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+
+        Thread.sleep(3 * 1000L);
+
+        mock.assertIsSatisfied();
+    }
+
+    public void testPollFileWhileSlowFileIsBeingWritten() throws Exception {
+        deleteDirectory("./res/home");
+        createDirectory("./res/home/slowfile");
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived("Hello WorldLine #0Line #1Line #2Bye World");
+
+        createSlowFile();
+
+        mock.assertIsSatisfied();
+    }
+
+    private void createSlowFile() throws Exception {
+        LOG.info("Creating a slow file ...");
+        File file = new File("./res/home/slowfile/hello.txt");
+        FileOutputStream fos = new FileOutputStream(file);
+        fos.write("Hello World".getBytes());
+        for (int i = 0; i < 3; i++) {
+            Thread.sleep(1000);
+            fos.write(("Line #" + i).getBytes());
+            LOG.info("Appending to slowfile");
+        }
+        fos.write("Bye World".getBytes());
+        fos.close();
+        LOG.info("... done creating slowfile");
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from(ftpUrl).to("mock:result");
+            }
+        };
+    }
+
+    private static void createDirectory(String s) {
+        File file = new File(s);
+        file.mkdirs();
+    }
+}

Modified: activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties?rev=679990&r1=679989&r2=679990&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties (original)
+++ activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties Sat Jul
26 06:37:46 2008
@@ -22,17 +22,17 @@
 
 # uncomment the following to enable camel debugging
 log4j.logger.org.apache.camel.component.file=DEBUG
-#log4j.logger.org.apache.mina=WARN
-#log4j.logger.org.apache.ftpserver=WARN
+log4j.logger.org.apache.mina=WARN
+log4j.logger.org.apache.ftpserver=WARN
 
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender
 log4j.appender.out.layout=org.apache.log4j.PatternLayout
-log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
-#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+#log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
 
 # File appender
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d %-5p %c{1} - %m %n
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
 log4j.appender.file.file=target/camel-ftp-test.log
\ No newline at end of file



Mime
View raw message