camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/2] camel git commit: CAMEL-7565: SFTP using PollEnrich with disconnect=true and delete=true does NOT delete the file.
Date Sat, 21 Mar 2015 11:11:34 GMT
Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x 8e9b48820 -> adf655d54
  refs/heads/master 8c6de215e -> 5224a13e5


CAMEL-7565: SFTP using PollEnrich with disconnect=true and delete=true does NOT delete the
file.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5224a13e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5224a13e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5224a13e

Branch: refs/heads/master
Commit: 5224a13e5257fcf289e5b95f65a5baaeb67ddecf
Parents: 8c6de21
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Sat Mar 21 12:12:36 2015 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Sat Mar 21 12:12:36 2015 +0100

----------------------------------------------------------------------
 .../file/remote/RemoteFileConsumer.java         | 39 ++++++++--
 ...nrichConsumeWithDisconnectAndDeleteTest.java | 78 ++++++++++++++++++++
 2 files changed, 111 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5224a13e/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index bb418e2..4a4ba2d 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -20,17 +20,19 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Ordered;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileConsumer;
 import org.apache.camel.component.file.GenericFileOperationFailedException;
+import org.apache.camel.support.SynchronizationAdapter;
 
 /**
  * Base class for remote file consumers.
  */
 public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
-    protected boolean loggedIn;
-    protected boolean loggedInWarning;
+    protected transient boolean loggedIn;
+    protected transient boolean loggedInWarning;
 
     public RemoteFileConsumer(RemoteFileEndpoint<T> endpoint, Processor processor,
RemoteFileOperations<T> operations) {
         super(endpoint, processor, operations);
@@ -87,10 +89,6 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T>
{
         if (log.isTraceEnabled()) {
             log.trace("postPollCheck on " + getEndpoint().getConfiguration().remoteServerInformation());
         }
-        if (getEndpoint().isDisconnect()) {
-            log.trace("postPollCheck disconnect from: {}", getEndpoint());
-            disconnect();
-        }
     }
 
     @Override
@@ -98,6 +96,35 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T>
{
         // mark the exchange to be processed synchronously as the ftp client is not thread
safe
         // and we must execute the callbacks in the same thread as this consumer
         exchange.setProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC, Boolean.TRUE);
+
+        // defer disconnect til the UoW is complete - but only the last exchange from the
batch should do that
+        boolean isLast = exchange.getProperty(Exchange.BATCH_COMPLETE, true, Boolean.class);
+        if (isLast && getEndpoint().isDisconnect()) {
+            exchange.addOnCompletion(new SynchronizationAdapter() {
+                @Override
+                public void onDone(Exchange exchange) {
+                    log.trace("postPollCheck disconnect from: {}", getEndpoint());
+                    disconnect();
+                }
+
+                @Override
+                public boolean allowHandover() {
+                    // do not allow handover as we must execute the callbacks in the same
thread as this consumer
+                    return false;
+                }
+
+                @Override
+                public int getOrder() {
+                    // we want to disconnect last
+                    return Ordered.LOWEST;
+                }
+
+                public String toString() {
+                    return "Disconnect";
+                }
+            });
+        }
+
         return super.processExchange(exchange);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/5224a13e/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java
b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java
new file mode 100644
index 0000000..f547b15
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/SftpPollEnrichConsumeWithDisconnectAndDeleteTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.file.remote.sftp.SftpServerTestSupport;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class SftpPollEnrichConsumeWithDisconnectAndDeleteTest extends SftpServerTestSupport
{
+
+    @Test
+    public void testSftpSimpleConsume() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        String expected = "Hello World";
+
+        // create file using regular file
+        template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME,
"hello.txt");
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt");
+        mock.expectedBodiesReceived(expected);
+
+        ProducerTemplate triggerTemplate = context.createProducerTemplate();
+        triggerTemplate.sendBody("vm:trigger", "");
+
+        assertMockEndpointsSatisfied();
+
+        long startFileDeletionCheckTime = System.currentTimeMillis();
+        boolean fileExists = true;
+        while (System.currentTimeMillis() - startFileDeletionCheckTime < 3000) {  // wait
up to 3000ms for file to be deleted
+            File file = new File(FTP_ROOT_DIR + "/hello.txt");
+            fileExists = file.exists();
+
+            if (fileExists) {
+                log.info("Will check that file has been deleted again in 200ms");
+                Thread.sleep(200);
+            }
+        }
+
+        assertFalse("The file should have been deleted", fileExists);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("vm:trigger")
+                    .pollEnrich("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delay=10s&disconnect=true&delete=true")
+                    .routeId("foo")
+                    .to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file


Mime
View raw message