camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [01/10] git commit: CAMEL-6936: Fixed file/ftp consumer when idempotent=true may not detect file changed as new file, due the file was regarded as still in-progress.
Date Thu, 21 Nov 2013 12:56:24 GMT
Updated Branches:
  refs/heads/camel-2.11.x da43e9c71 -> e1d7bc81b
  refs/heads/camel-2.12.x dcaa5d782 -> 9c980dda2
  refs/heads/master 37e0e6bb8 -> 2c9d84a7c


CAMEL-6936: Fixed file/ftp consumer when idempotent=true may not detect file changed as new
file, due the file was regarded as still in-progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a5696ea9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a5696ea9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a5696ea9

Branch: refs/heads/master
Commit: a5696ea924e26aee4ce76d5056458d5abf42a811
Parents: 37e0e6b
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Thu Nov 21 09:11:27 2013 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Thu Nov 21 11:02:03 2013 +0100

----------------------------------------------------------------------
 .../component/file/GenericFileConsumer.java     | 48 +++++++--------
 ...eConsumerIdempotentKeyChangedIssue2Test.java | 63 ++++++++++++++++++++
 2 files changed, 87 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a5696ea9/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 02130d2..2b86c7f 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -471,45 +471,43 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
      * @return <tt>true</tt> to include the file, <tt>false</tt>
to skip it
      */
     protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T>
files) {
+        String absoluteFilePath = file.getAbsoluteFilePath();
+
         if (!isMatched(file, isDirectory, files)) {
             log.trace("File did not match. Will skip this file: {}", file);
             return false;
         }
 
-        // if its a file then check if its already in progress
-        if (!isDirectory && isInProgress(file)) {
+        // directory is always valid
+        if (isDirectory) {
+            return true;
+        }
+
+        // check if file is already in progress
+        if (endpoint.getInProgressRepository().contains(absoluteFilePath)) {
             if (log.isTraceEnabled()) {
                 log.trace("Skipping as file is already in progress: {}", file.getFileName());
             }
             return false;
         }
 
-        boolean answer = true;
-        String key = null;
-        try {
-            // if its a file then check we have the file in the idempotent registry already
-            if (!isDirectory && endpoint.isIdempotent()) {
-                // use absolute file path as default key, but evaluate if an expression key
was configured
-                key = file.getAbsoluteFilePath();
-                if (endpoint.getIdempotentKey() != null) {
-                    Exchange dummy = endpoint.createExchange(file);
-                    key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
-                }
-                if (key != null && endpoint.getIdempotentRepository().contains(key))
{
-                    log.trace("This consumer is idempotent and the file has been consumed
before. Will skip this file: {}", file);
-                    answer = false;
-                }
+        // if its a file then check we have the file in the idempotent registry already
+        if (endpoint.isIdempotent()) {
+            // use absolute file path as default key, but evaluate if an expression key was
configured
+            String key = file.getAbsoluteFilePath();
+            if (endpoint.getIdempotentKey() != null) {
+                Exchange dummy = endpoint.createExchange(file);
+                key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
             }
-        } finally {
-            // ensure to run this in finally block in case of runtime exceptions being thrown
-            if (!answer) {
-                // remove file from the in progress list as its no longer in progress
-                endpoint.getInProgressRepository().remove(key);
+            if (key != null && endpoint.getIdempotentRepository().contains(key))
{
+                log.trace("This consumer is idempotent and the file has been consumed before
matching idempotentKey: {}. Will skip this file: {}", key, file);
+                return false;
             }
         }
 
-        // file matched
-        return answer;
+        // okay so final step is to be able to add atomic as in-progress, so we are the
+        // only thread processing this file
+        return endpoint.getInProgressRepository().add(absoluteFilePath);
     }
 
     /**
@@ -614,7 +612,9 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
      *
      * @param file the file
      * @return <tt>true</tt> if the file is already in progress
+     * @deprecated no longer in use, use {@link org.apache.camel.component.file.GenericFileEndpoint#getInProgressRepository()}
instead.
      */
+    @Deprecated
     protected boolean isInProgress(GenericFile<T> file) {
         String key = file.getAbsoluteFilePath();
         // must use add, to have operation as atomic

http://git-wip-us.apache.org/repos/asf/camel/blob/a5696ea9/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyChangedIssue2Test.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyChangedIssue2Test.java
b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyChangedIssue2Test.java
new file mode 100644
index 0000000..57569b2
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentKeyChangedIssue2Test.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+
+public class FileConsumerIdempotentKeyChangedIssue2Test extends ContextTestSupport {
+
+    private Endpoint endpoint;
+
+    public void testFile() throws Exception {
+        getMockEndpoint("mock:file").expectedBodiesReceived("Hello World");
+
+        template.sendBodyAndHeader(endpoint, "Hello World", Exchange.FILE_NAME, "hello.txt");
+        assertMockEndpointsSatisfied();
+        oneExchangeDone.matches(5, TimeUnit.SECONDS);
+
+        resetMocks();
+        getMockEndpoint("mock:file").expectedBodiesReceived("Hello World Again");
+
+        // wait a bit to allow the consumer to poll once and see a non-changed file
+        Thread.sleep(250);
+
+        template.sendBodyAndHeader(endpoint, "Hello World Again", Exchange.FILE_NAME, "hello.txt");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                endpoint = endpoint("file:target/changed?noop=true&delay=100"
+                        + "&idempotentKey=${file:name}-${file:size}-${file:modified}");
+
+                from(endpoint)
+                    .convertBodyTo(String.class)
+                    .to("log:file")
+                    .to("mock:file");
+            }
+        };
+    }
+}


Mime
View raw message