camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r782544 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/file/ test/java/org/apache/camel/component/file/stress/
Date Mon, 08 Jun 2009 07:03:05 GMT
Author: davsclaus
Date: Mon Jun  8 07:03:05 2009
New Revision: 782544

URL: http://svn.apache.org/viewvc?rev=782544&view=rev
Log:
CAMEL-1670: Fixed problem with in progress detection of files for file consumer.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java
      - copied, changed from r782538, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressManually.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=782544&r1=782543&r2=782544&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
Mon Jun  8 07:03:05 2009
@@ -64,8 +64,10 @@
                 }
             } else if (file.isFile()) {
                 if (isValidFile(gf, false)) {
-                    if (isInProgress(file, files)) {
-                        log.trace("Skipping as file is already in progress: " + file);
+                    if (isInProgress(gf)) {
+                        if (log.isTraceEnabled()) {
+                            log.trace("Skipping as file is already in progress: " + gf.getFileName());
+                        }
                     } else {
                         // matched file so add
                         fileList.add(gf);
@@ -78,27 +80,6 @@
     }
 
     /**
-     * Is the given file already in progress.
-     *
-     * @param target  the target file
-     * @param files   the list of files found in the current directory
-     * @return <tt>true</tt> if the file is already in progress
-     */
-    protected boolean isInProgress(File target, File[] files) {
-        for (File file : files) {
-            String name = file.getName();
-            if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) {
-                String before = ObjectHelper.before(name, FileComponent.DEFAULT_LOCK_FILE_POSTFIX);
-                if (target.getName().equals(before)) {
-                    return true;
-                }
-            }
-        }
-
-        return false;
-    }
-
-    /**
      * Creates a new GenericFile<File> based on the given file.
      *
      * @param endpointPath the starting directory the endpoint was configued with

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=782544&r1=782543&r2=782544&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
Mon Jun  8 07:03:05 2009
@@ -121,6 +121,13 @@
             // process the current exchange
             processExchange(exchange);
         }
+
+        // remove the file from the in progress list in case the batch was limited by max
messages per poll
+        for (int index = 0; index < exchanges.size() && isRunAllowed(); index++)
{
+            GenericFileExchange<T> exchange = (GenericFileExchange<T>) exchanges.poll();
+            String key = exchange.getGenericFile().getFileName();
+            endpoint.getInProgressRepository().remove(key);
+        }
     }
 
     /**
@@ -231,7 +238,7 @@
      * </ul>
      * And then <tt>true</tt> for directories.
      *
-     * @param file        the remote file
+     * @param file        the file
      * @param isDirectory wether the file is a directory or a file
      * @return <tt>true</tt> if the remote file is matched, <tt>false</tt>
if not
      */
@@ -284,6 +291,17 @@
         return true;
     }
 
+    /**
+     * Is the given file already in progress.
+     *
+     * @param file the file
+     * @return <tt>true</tt> if the file is already in progress
+     */
+    protected boolean isInProgress(GenericFile<T> file) {
+        String key = file.getFileName();
+        return !endpoint.getInProgressRepository().add(key);
+    }
+
     private void evaluteFileExpression() {
         if (fileExpressionResult == null) {
             // create a dummy exchange as Exchange is needed for expression evaluation
@@ -292,5 +310,4 @@
         }
     }
 
-
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java?rev=782544&r1=782543&r2=782544&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
Mon Jun  8 07:03:05 2009
@@ -30,6 +30,7 @@
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.ScheduledPollEndpoint;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.spi.Language;
@@ -52,6 +53,7 @@
     protected GenericFileProcessStrategy<T> processStrategy;
     protected GenericFileConfiguration configuration;
 
+    protected IdempotentRepository<String> inProgressRepository = new MemoryIdempotentRepository();
     protected String localWorkDirectory;
     protected boolean autoCreate = true;
     protected int bufferSize = 128 * 1024;
@@ -393,6 +395,14 @@
         this.maxMessagesPerPoll = maxMessagesPerPoll;
     }
 
+    public IdempotentRepository<String> getInProgressRepository() {
+        return inProgressRepository;
+    }
+
+    public void setInProgressRepository(IdempotentRepository<String> inProgressRepository)
{
+        this.inProgressRepository = inProgressRepository;
+    }
+
     /**
      * Configures the given message with the file which sets the body to the
      * file object.

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java?rev=782544&r1=782543&r2=782544&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
Mon Jun  8 07:03:05 2009
@@ -92,8 +92,10 @@
             if (!committed) {
                 processStrategyRollback(processStrategy, exchange, file);
             }
-        }
 
+            // remove file from the in progress list as its no longer in progress
+            endpoint.getInProgressRepository().remove(file.getFileName());
+        }
     }
 
     /**

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java
(from r782538, camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressManually.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressManually.java&r1=782538&r2=782544&rev=782544&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressManually.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/stress/FileAsyncStressTest.java
Mon Jun  8 07:03:05 2009
@@ -27,13 +27,22 @@
 /**
  * @version $Revision$
  */
-public class FileAsyncStressManually extends ContextTestSupport {
+public class FileAsyncStressTest extends ContextTestSupport {
 
-    public void testAsyncStress() throws Exception {
-        // test by starting the unit test FileAsyncStressFileDropper in another JVM
+    private int files = 150;
 
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        deleteDirectory("target/filestress");
+        for (int i = 0; i < files; i++) {
+            template.sendBodyAndHeader("file:target/filestress", "Hello World", Exchange.FILE_NAME,
i + ".txt");
+        }
+    }
+
+    public void testAsyncStress() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMinimumMessageCount(250);
+        mock.expectedMinimumMessageCount(100);
 
         assertMockEndpointsSatisfied();
     }
@@ -43,7 +52,10 @@
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("file:target/filestress?readLock=markerFile&maxMessagesPerPoll=25&move=backup")
+                // leverage the fact that we can limit to max 25 files per poll
+                // this will result in polling again and potentially picking up files
+                // that already are in progress
+                from("file:target/filestress?maxMessagesPerPoll=50")
                     .threads(10)
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
@@ -57,4 +69,4 @@
         };
     }
 
-}
+}
\ No newline at end of file



Mime
View raw message