camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r680002 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/test/java/org/apache/camel/component/file/ components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/
Date Sat, 26 Jul 2008 14:48:08 GMT
Author: davsclaus
Date: Sat Jul 26 07:48:07 2008
New Revision: 680002

URL: http://svn.apache.org/viewvc?rev=680002&view=rev
Log:
CAMEL-760: FileConsumer added option consumer.exclusiveRead to avoid polling files that is
in progress of being written. Beware this options is default.

Added:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
    activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
    activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=680002&r1=680001&r2=680002&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
Sat Jul 26 07:48:07 2008
@@ -17,6 +17,7 @@
 package org.apache.camel.component.file;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.camel.AsyncCallback;
@@ -39,14 +40,14 @@
     private ConcurrentHashMap<File, Long> fileSizes = new ConcurrentHashMap<File,
Long>();
     private ConcurrentHashMap<File, Long> noopMap = new ConcurrentHashMap<File,
Long>();
 
-    private boolean generateEmptyExchangeWhenIdle;
-    private boolean recursive = true;
-    private String regexPattern = "";
-
     private long lastPollTime;
     private int unchangedDelay;
     private boolean unchangedSize;
 
+    private boolean generateEmptyExchangeWhenIdle;
+    private boolean recursive = true;
+    private String regexPattern = "";
+    private boolean exclusiveRead = true;
 
     public FileConsumer(final FileEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -118,6 +119,11 @@
 
         endpoint.configureMessage(file, exchange.getIn());
         try {
+            // is we use excluse read then acquire the exclusive read (waiting until we got
it)
+            if (exclusiveRead) {
+                acquireExclusiveRead(file);
+            }
+
             if (LOG.isDebugEnabled()) {
                 LOG.debug("About to process file: " + file + " using exchange: " + exchange);
             }
@@ -158,6 +164,36 @@
         return 1;
     }
 
+    protected void acquireExclusiveRead(File file) throws IOException {
+        LOG.trace("Acquiring exclusive read (avoid reading file that is in progress of being
written)");
+
+        // the trick is to try to rename the file, if we can rename then we have exclusive
read
+        // NOTE: using java.nio (channel lokc) doesn't help us as we can have write access
but the
+        // file is still in progress of being written (slow writer)
+        String originalName = file.getAbsolutePath();
+        File newName = new File(originalName + ".exclusiveRead");
+        boolean exclusive = false;
+        while (! exclusive) {
+            exclusive = file.renameTo(newName);
+            if (exclusive) {
+                LOG.trace("Got it renaming it back to original name");
+                // rename it back
+                newName.renameTo(file);
+            } else {
+                LOG.trace("Exclusive read not granted. Sleeping for 1000 millis");
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Acquired exclusive read to: " + file);
+        }
+    }
+
+
     /**
      * Strategy when the file was processed and a commit should be executed.
      *
@@ -311,4 +347,11 @@
         this.unchangedSize = unchangedSize;
     }
 
+    public boolean isExclusiveRead() {
+        return exclusiveRead;
+    }
+
+    public void setExclusiveRead(boolean exclusiveRead) {
+        this.exclusiveRead = exclusiveRead;
+    }
 }

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java?rev=680002&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
(added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
Sat Jul 26 07:48:07 2008
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+import java.io.FileOutputStream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Unit test to verify exclusive read - that we do not poll files that is in progress of
being written.
+ */
+public class FileExclusiveReadTest extends ContextTestSupport {
+
+    private static final Log LOG = LogFactory.getLog(FileExclusiveReadTest.class);
+
+    private String fileUrl = "file://target/exclusiveread/slowfile?consumer.delay=500&consumer.exclusiveRead=true";
+
+    @Override
+    protected void setUp() throws Exception {
+        disableJMX();
+        super.setUp();
+    }
+
+    public void testPoolIn3SecondsButNoFiles() throws Exception {
+        deleteDirectory("./target/exclusiveread");
+        createDirectory("./target/exclusiveread/slowfile");
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+
+        Thread.sleep(3 * 1000L);
+
+        mock.assertIsSatisfied();
+    }
+
+    public void testPollFileWhileSlowFileIsBeingWritten() throws Exception {
+        deleteDirectory("./target/exclusiveread");
+        createDirectory("./target/exclusiveread/slowfile");
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived("Hello WorldLine #0Line #1Line #2Bye World");
+
+        createSlowFile();
+
+        mock.assertIsSatisfied();
+    }
+
+    private void createSlowFile() throws Exception {
+        LOG.info("Creating a slow file ...");
+        File file = new File("./target/exclusiveread/slowfile/hello.txt");
+        FileOutputStream fos = new FileOutputStream(file);
+        fos.write("Hello World".getBytes());
+        for (int i = 0; i < 3; i++) {
+            Thread.sleep(1000);
+            fos.write(("Line #" + i).getBytes());
+            LOG.info("Appending to slowfile");
+        }
+        fos.write("Bye World".getBytes());
+        fos.close();
+        LOG.info("... done creating slowfile");
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from(fileUrl).to("mock:result");
+            }
+        };
+    }
+
+    private static void createDirectory(String s) {
+        File file = new File(s);
+        file.mkdirs();
+    }
+
+}

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=680002&r1=680001&r2=680002&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
(original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
Sat Jul 26 07:48:07 2008
@@ -174,7 +174,7 @@
         // the trick is to try to rename the file, if we can rename then we have exclusive
read
         // since its a remote file we can not use java.nio to get a RW access
         String originalName = ftpFile.getName();
-        String newName = originalName + ".camel";
+        String newName = originalName + ".exclusiveRead";
         boolean exclusive = false;
         while (! exclusive) {
             exclusive = client.rename(originalName, newName);

Modified: activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=680002&r1=680001&r2=680002&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
(original)
+++ activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
Sat Jul 26 07:48:07 2008
@@ -180,7 +180,7 @@
         // the trick is to try to rename the file, if we can rename then we have exclusive
read
         // since its a remote file we can not use java.nio to get a RW access
         String originalName = sftpFile.getFilename();
-        String newName = originalName + ".camel";
+        String newName = originalName + "..exclusiveRead";
         boolean exclusive = false;
         while (! exclusive) {
             try {



Mime
View raw message