camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r680913 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/ components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/
Date Wed, 30 Jul 2008 05:53:15 GMT
Author: davsclaus
Date: Tue Jul 29 22:53:15 2008
New Revision: 680913

URL: http://svn.apache.org/viewvc?rev=680913&view=rev
Log:
CAMEL-654 and CAMEL-760: Option exclusiveRead renamed to exclusiveReadLock and is now default
true.

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
    activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
    activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
    activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
    activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
    activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoFilesTest.java
    activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=680913&r1=680912&r2=680913&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
Tue Jul 29 22:53:15 2008
@@ -51,7 +51,7 @@
     private boolean generateEmptyExchangeWhenIdle;
     private boolean recursive = true;
     private String regexPattern = "";
-    private boolean exclusiveRead;
+    private boolean exclusiveReadLock = true;
 
     public FileConsumer(final FileEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -60,6 +60,8 @@
 
     protected synchronized void poll() throws Exception {
         int rc = pollFileOrDirectory(endpoint.getFile(), isRecursive());
+
+        // if no files consumes and using generateEmptyExchangeWhenIdle option then process
an empty exchange 
         if (rc == 0 && generateEmptyExchangeWhenIdle) {
             final FileExchange exchange = endpoint.createExchange((File)null);
             getAsyncProcessor().process(exchange, new AsyncCallback() {
@@ -67,6 +69,7 @@
                 }
             });
         }
+
         lastPollTime = System.currentTimeMillis();
     }
 
@@ -79,11 +82,15 @@
      */
     protected int pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
         if (!fileOrDirectory.isDirectory()) {
-            return pollFile(fileOrDirectory); // process the file
+            // process the file
+            return pollFile(fileOrDirectory);
         } else if (processDir) {
+            // directory that can be recursive
             int rc = 0;
             if (isValidFile(fileOrDirectory)) {
-                LOG.debug("Polling directory " + fileOrDirectory);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Polling directory " + fileOrDirectory);
+                }
                 File[] files = fileOrDirectory.listFiles();
                 for (File file : files) {
                     rc += pollFileOrDirectory(file, isRecursive()); // self-recursion
@@ -91,7 +98,9 @@
             }
             return rc;
         } else {
-            LOG.debug("Skipping directory " + fileOrDirectory);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Skipping directory " + fileOrDirectory);
+            }
             return 0;
         }
     }
@@ -103,6 +112,9 @@
      * @return returns 1 if the file was processed, 0 otherwise.
      */
     protected int pollFile(final File file) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Polling file: " + file);
+        }
 
         if (!file.exists()) {
             return 0;
@@ -124,8 +136,8 @@
         endpoint.configureMessage(file, exchange.getIn());
         try {
             // is we use excluse read then acquire the exclusive read (waiting until we got
it)
-            if (exclusiveRead) {
-                acquireExclusiveRead(file);
+            if (exclusiveReadLock) {
+                acquireExclusiveReadLock(file);
             }
 
             if (LOG.isDebugEnabled()) {
@@ -158,7 +170,7 @@
 
             } else {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(endpoint + " cannot process file: " + file);
+                    LOG.debug(endpoint + " can not process file: " + file);
                 }
             }
         } catch (Throwable e) {
@@ -168,23 +180,28 @@
         return 1;
     }
 
-    protected void acquireExclusiveRead(File file) throws IOException {
+    /**
+     * Acquires exclusive read lock to the given file. Will wait until the lock is granted.
+     * After granting the read lock it is realeased, we just want to make sure that when
we start
+     * consuming the file its not currently in progress of being written by third party.
+     */
+    protected void acquireExclusiveReadLock(File file) throws IOException {
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Waiting for exclusive lock to file: " + file);
+            LOG.trace("Waiting for exclusive read lock to file: " + file);
         }
 
         // try to acquire rw lock on the file before we can consume it
         FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
         try {
             FileLock lock = channel.lock();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Acquired exclusive lock: " + lock + " to file: " + file);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Acquired exclusive read lock: " + lock + " to file: " + file);
             }
             // just release it now we dont want to hold it during the rest of the processing
             lock.release();
         } finally {
             // must close channel
-            ObjectHelper.close(channel, "FileConsumer during acquiring of exclusive lock",
LOG);
+            ObjectHelper.close(channel, "FileConsumer during acquiring of exclusive read
lock", LOG);
         }
     }
 
@@ -341,11 +358,11 @@
         this.unchangedSize = unchangedSize;
     }
 
-    public boolean isExclusiveRead() {
-        return exclusiveRead;
+    public boolean isExclusiveReadLock() {
+        return exclusiveReadLock;
     }
 
-    public void setExclusiveRead(boolean exclusiveRead) {
-        this.exclusiveRead = exclusiveRead;
+    public void setExclusiveReadLock(boolean exclusiveReadLock) {
+        this.exclusiveReadLock = exclusiveReadLock;
     }
 }

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=680913&r1=680912&r2=680913&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
(original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
Tue Jul 29 22:53:15 2008
@@ -128,7 +128,7 @@
                     pollDirectory(getFullFileName(ftpFile));
                 }
             } else {
-                LOG.debug("Unsupported type of FTPFile: " + ftpFile + " (not a file or directory).
Is skipped.");
+                LOG.debug("Unsupported type of FTPFile: " + ftpFile + " (not a file or directory).
It is skipped.");
             }
         }
 
@@ -155,8 +155,8 @@
             String fullFileName = getFullFileName(ftpFile);
 
             // is we use excluse read then acquire the exclusive read (waiting until we got
it)
-            if (exclusiveRead) {
-                acquireExclusiveRead(client, ftpFile);
+            if (exclusiveReadLock) {
+                acquireExclusiveReadLock(client, ftpFile);
             }
 
             // retrieve the file
@@ -188,7 +188,7 @@
                 boolean deleted = client.deleteFile(ftpFile.getName());
                 if (!deleted) {
                     // ignore just log a warning
-                    LOG.warn("Could not delete file: " + ftpFile.getName() + " from: " +
remoteServer());
+                    LOG.warn("Can not delete file: " + ftpFile.getName() + " from: " + remoteServer());
                 }
             } else if (isMoveFile()) {
                 String fromName = ftpFile.getName();
@@ -206,7 +206,7 @@
                     if (lastPathIndex != -1) {
                         String directory = toName.substring(0, lastPathIndex);
                         if (!FtpUtils.buildDirectory(client, directory)) {
-                            LOG.warn("Couldn't build directory: " + directory + " (could
be because of denied permissions)");
+                            LOG.warn("Can not build directory: " + directory + " (maybe because
of denied permissions)");
                         }
                     }
                 }
@@ -214,7 +214,7 @@
                 // try to rename
                 boolean success = client.rename(fromName, toName);
                 if (!success) {
-                    LOG.warn("Could not move file: " + fromName + " to: " + toName);
+                    LOG.warn("Can not move file: " + fromName + " to: " + toName);
                 }
             }
 
@@ -222,26 +222,26 @@
         }
     }
 
-    protected void acquireExclusiveRead(FTPClient client, FTPFile ftpFile) throws IOException
{
+    protected void acquireExclusiveReadLock(FTPClient client, FTPFile ftpFile) throws IOException
{
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Waiting for exclusive lock to file: " + ftpFile);
+            LOG.trace("Waiting for exclusive read lock to file: " + ftpFile);
         }
 
         // the trick is to try to rename the file, if we can rename then we have exclusive
read
         // since its a remote file we can not use java.nio to get a RW lock
         String originalName = ftpFile.getName();
-        String newName = originalName + ".camelExclusiveRead";
+        String newName = originalName + ".camelExclusiveReadLock";
         boolean exclusive = false;
         while (!exclusive) {
             exclusive = client.rename(originalName, newName);
             if (exclusive) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Acquired exclusive lock to file: " + originalName);
+                    LOG.debug("Acquired exclusive read lock to file: " + originalName);
                 }
                 // rename it back so we can read it
                 client.rename(newName, originalName);
             } else {
-                LOG.trace("Exclusive lock not granted. Sleeping for 1000 millis.");
+                LOG.trace("Exclusive read lock not granted. Sleeping for 1000 millis.");
                 try {
                     Thread.sleep(1000);
                 } catch (InterruptedException e) {

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=680913&r1=680912&r2=680913&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
(original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
Tue Jul 29 22:53:15 2008
@@ -31,7 +31,7 @@
     protected boolean recursive = true;
     protected String regexPattern;
     protected boolean setNames = true;
-    protected boolean exclusiveRead = true;
+    protected boolean exclusiveReadLock = true;
     protected boolean deleteFile;
     protected String moveNamePrefix;
     protected String moveNamePostfix;
@@ -132,12 +132,12 @@
         this.setNames = setNames;
     }
 
-    public boolean isExclusiveRead() {
-        return exclusiveRead;
+    public boolean isExclusiveReadLock() {
+        return exclusiveReadLock;
     }
 
-    public void setExclusiveRead(boolean exclusiveRead) {
-        this.exclusiveRead = exclusiveRead;
+    public void setExclusiveReadLock(boolean exclusiveReadLock) {
+        this.exclusiveReadLock = exclusiveReadLock;
     }
 
     public boolean isDeleteFile() {

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=680913&r1=680912&r2=680913&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
(original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
Tue Jul 29 22:53:15 2008
@@ -169,8 +169,8 @@
             String fullFileName = getFullFileName(sftpFile);
 
             // is we use excluse read then acquire the exclusive read (waiting until we got
it)
-            if (exclusiveRead) {
-                acquireExclusiveRead(sftpFile);
+            if (exclusiveReadLock) {
+                acquireExclusiveReadLock(sftpFile);
             }
 
             // retrieve the file
@@ -215,7 +215,7 @@
                     if (lastPathIndex != -1) {
                         String directory = toName.substring(0, lastPathIndex);
                         if (!SftpUtils.buildDirectory(channel, directory)) {
-                            LOG.warn("Couldn't build directory: " + directory + " (could
be because of denied permissions)");
+                            LOG.warn("Can not build directory: " + directory + " (maybe because
of denied permissions)");
                         }
                     }
                 }
@@ -225,7 +225,7 @@
                     channel.rename(fromName, toName);
                 } catch (SftpException e) {
                     // ignore just log a warning
-                    LOG.warn("Could not move file: " + fromName + " to: " + toName);
+                    LOG.warn("Can not move file: " + fromName + " to: " + toName);
                 }
             }
 
@@ -244,15 +244,15 @@
         }
     }
 
-    protected void acquireExclusiveRead(ChannelSftp.LsEntry sftpFile) throws SftpException
{
+    protected void acquireExclusiveReadLock(ChannelSftp.LsEntry sftpFile) throws SftpException
{
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Waiting for exclusive lock to file: " + sftpFile);
+            LOG.trace("Waiting for exclusive read lock to file: " + sftpFile);
         }
 
         // the trick is to try to rename the file, if we can rename then we have exclusive
read
         // since its a remote file we can not use java.nio to get a RW access
         String originalName = sftpFile.getFilename();
-        String newName = originalName + ".camelExclusiveRead";
+        String newName = originalName + ".camelExclusiveReadLock";
         boolean exclusive = false;
         while (!exclusive) {
             try {
@@ -264,12 +264,12 @@
 
             if (exclusive) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Acquired exclusive lock to file: " + originalName);
+                    LOG.debug("Acquired exclusive read lock to file: " + originalName);
                 }
                 // rename it back so we can read it
                 channel.rename(newName, originalName);
             } else {
-                LOG.trace("Exclusive lock not granted. Sleeping for 1000 millis");
+                LOG.trace("Exclusive read lock not granted. Sleeping for 1000 millis");
                 try {
                     Thread.sleep(1000);
                 } catch (InterruptedException e) {

Modified: activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java?rev=680913&r1=680912&r2=680913&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
(original)
+++ activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
Tue Jul 29 22:53:15 2008
@@ -20,7 +20,6 @@
 import java.io.FileOutputStream;
 
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -32,7 +31,7 @@
     private static final Log LOG = LogFactory.getLog(FromFtpExclusiveReadTest.class);
 
     private String port = "20090";
-    private String ftpUrl = "ftp://admin@localhost:" + port + "/slowfile?password=admin&consumer.exclusiveRead=true&consumer.delay=500";
+    private String ftpUrl = "ftp://admin@localhost:" + port + "/slowfile?password=admin&consumer.exclusiveReadLock=true&consumer.delay=500";
 
     public String getPort() {
         return port;

Modified: activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoFilesTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoFilesTest.java?rev=680913&r1=680912&r2=680913&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoFilesTest.java
(original)
+++ activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoFilesTest.java
Tue Jul 29 22:53:15 2008
@@ -17,7 +17,6 @@
 package org.apache.camel.component.file.remote;
 
 import java.io.File;
-import java.io.FileOutputStream;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -32,7 +31,7 @@
     private static final Log LOG = LogFactory.getLog(FromFtpExclusiveReadTest.class);
 
     private String port = "20020";
-    private String ftpUrl = "ftp://admin@localhost:" + port + "/slowfile?password=admin&binary=false&consumer.exclusiveRead=true&consumer.delay=500";
+    private String ftpUrl = "ftp://admin@localhost:" + port + "/slowfile?password=admin&binary=false&consumer.exclusiveReadLock=true&consumer.delay=500";
 
     public String getPort() {
         return port;

Modified: activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java?rev=680913&r1=680912&r2=680913&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java
(original)
+++ activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java
Tue Jul 29 22:53:15 2008
@@ -32,7 +32,7 @@
     private static final Log LOG = LogFactory.getLog(FromFtpExclusiveReadTest.class);
 
     private String port = "20027";
-    private String ftpUrl = "ftp://admin@localhost:" + port + "/slowfile?password=admin&consumer.exclusiveRead=false&consumer.delay=500";
+    private String ftpUrl = "ftp://admin@localhost:" + port + "/slowfile?password=admin&consumer.exclusiveReadLock=false&consumer.delay=500";
 
     public String getPort() {
         return port;



Mime
View raw message