camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r680093 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/component/file/FileConsumer.java test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
Date Sun, 27 Jul 2008 12:31:26 GMT
Author: davsclaus
Date: Sun Jul 27 05:31:26 2008
New Revision: 680093

URL: http://svn.apache.org/viewvc?rev=680093&view=rev
Log:
CAMEL-760: Using java.nio for exclusive lock in FileConsumer

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=680093&r1=680092&r2=680093&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
Sun Jul 27 05:31:26 2008
@@ -18,12 +18,16 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.ScheduledPollConsumer;
 import org.apache.camel.processor.DeadLetterChannel;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -165,30 +169,21 @@
     }
 
     protected void acquireExclusiveRead(File file) throws IOException {
-        LOG.trace("Acquiring exclusive read (avoid reading file that is in progress of being
written)");
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Acquiring exclusive read (avoid reading file that is in progress of
being written) to " + file);
+        }
 
-        // the trick is to try to rename the file, if we can rename then we have exclusive
read
-        // NOTE: using java.nio (channel lokc) doesn't help us as we can have write access
but the
-        // file is still in progress of being written (slow writer)
-        // TODO: Seems to not work on Unix boxes (see the unit test FileExclusiveReadTest)
-        String originalName = file.getAbsolutePath();
-        File newName = new File(originalName + ".camelExclusiveRead");
-        boolean exclusive = false;
-        while (! exclusive) {
-            exclusive = file.renameTo(newName);
-            if (exclusive) {
-                LOG.trace("Got it renaming it back to original name");
-                // rename it back
-                newName.renameTo(file);
-            } else {
-                LOG.trace("Exclusive read not granted. Sleeping for 1000 millis.");
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    // ignore
-                }
-            }
+        // try to acquire rw lock on the file before we can consume it
+        FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
+        try {
+            FileLock lock = channel.lock();
+            // just release it now we dont want to hold it during the rest of the processing
+            lock.release();
+        } finally {
+            // must close channel
+            ObjectHelper.close(channel, "FileConsumer during acquiring of exclusive read",
LOG);
         }
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("Acquired exclusive read to: " + file);
         }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java?rev=680093&r1=680092&r2=680093&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
Sun Jul 27 05:31:26 2008
@@ -18,6 +18,8 @@
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileLock;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
@@ -51,13 +53,12 @@
         mock.assertIsSatisfied();
     }
 
-    // TODO: Fix me on Bamboo
-    public void xxxtestPollFileWhileSlowFileIsBeingWritten() throws Exception {
+    public void testPollFileWhileSlowFileIsBeingWritten() throws Exception {
         deleteDirectory("./target/exclusiveread");
         createDirectory("./target/exclusiveread/slowfile");
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
-        mock.expectedBodiesReceived("Hello WorldLine #0Line #1Line #2Bye World");
+        mock.expectedBodiesReceived("Hello World");
 
         createSlowFile();
 
@@ -66,15 +67,24 @@
 
     private void createSlowFile() throws Exception {
         LOG.info("Creating a slow file ...");
+
         File file = new File("./target/exclusiveread/slowfile/hello.txt");
         FileOutputStream fos = new FileOutputStream(file);
-        fos.write("Hello World".getBytes());
-        for (int i = 0; i < 3; i++) {
-            Thread.sleep(1000);
-            fos.write(("Line #" + i).getBytes());
+
+        // get a lock so we are the only one working on this file
+        FileLock lock = fos.getChannel().lock();
+
+        byte[] buffer = "Hello World".getBytes();
+        ByteBuffer bb = ByteBuffer.wrap(buffer);
+        for (int i = 0; i < buffer.length; i++) {
             LOG.info("Appending to slowfile");
+            Thread.sleep(300);
         }
-        fos.write("Bye World".getBytes());
+        LOG.info("Writing to file");
+        fos.write(buffer);
+        LOG.info("Releasing lock");
+        lock.release();
+        LOG.info("Closing file");
         fos.close();
         LOG.info("... done creating slowfile");
     }



Mime
View raw message