flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [6/9] git commit: FLUME-1505. TestFileChannel needs to be able to force a checkpoint.
Date Fri, 24 Aug 2012 17:23:23 GMT
FLUME-1505. TestFileChannel needs to be able to force a checkpoint.

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ab7ccae3
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ab7ccae3
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ab7ccae3

Branch: refs/heads/flume-1.3.0
Commit: ab7ccae3253e23cb209d1527ba2a7c7b3d9fcb39
Parents: 4f42f04
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Wed Aug 22 13:16:22 2012 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Aug 24 10:21:12 2012 -0700

----------------------------------------------------------------------
 flume-ng-channels/flume-file-channel/pom.xml       |    7 ++
 .../java/org/apache/flume/channel/file/Log.java    |    2 +-
 .../apache/flume/channel/file/TestFileChannel.java |   64 +++++----------
 3 files changed, 29 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ab7ccae3/flume-ng-channels/flume-file-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/pom.xml b/flume-ng-channels/flume-file-channel/pom.xml
index cd882e5..62d80e3 100644
--- a/flume-ng-channels/flume-file-channel/pom.xml
+++ b/flume-ng-channels/flume-file-channel/pom.xml
@@ -81,6 +81,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.easytesting</groupId>
+      <artifactId>fest-reflect</artifactId>
+      <version>1.4</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flume/blob/ab7ccae3/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 9b13423..1e2706b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -647,7 +647,7 @@ class Log {
    * @param force  a flag to force the writing of checkpoint
    * @throws IOException if we are unable to write the checkpoint out to disk
    */
-  private boolean writeCheckpoint(boolean force) throws Exception {
+  private Boolean writeCheckpoint(Boolean force) throws Exception {
     boolean checkpointCompleted = false;
     boolean lockAcquired = tryLockExclusive();
     if(!lockAcquired) {

http://git-wip-us.apache.org/repos/asf/flume/blob/ab7ccae3/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index 3dededf..bca2b17 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -17,7 +17,8 @@
  * under the License.
  */
 package org.apache.flume.channel.file;
-
+import static org.fest.reflect.core.Reflection.field;
+import static org.fest.reflect.core.Reflection.method;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -64,7 +65,6 @@ import com.google.common.io.Files;
 import com.google.common.io.Resources;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 
 public class TestFileChannel {
 
@@ -144,12 +144,7 @@ public class TestFileChannel {
         channel.take();
       }
     }).get();
-    long lastTake = System.currentTimeMillis();
-    File inflightsFile = new File(checkpointDir, "inflighttakes");
-
-    while (inflightsFile.lastModified() < lastTake) {
-      Thread.sleep(500);
-    }
+    forceCheckpoint(channel);
     channel.stop();
     //Simulate a sink, so separate thread.
     try {
@@ -240,11 +235,7 @@ public class TestFileChannel {
                 }
               }
             });
-    long lastPut = System.currentTimeMillis();
-    File checkpoint = new File(checkpointDir, "checkpoint");
-    while (checkpoint.lastModified() < lastPut) {
-      Thread.sleep(500);
-    }
+    forceCheckpoint(channel);
     tx.commit();
     tx.close();
     latch.countDown();
@@ -810,17 +801,13 @@ public class TestFileChannel {
     Transaction tx = channel.getTransaction();
     tx.begin();
     Event e = channel.take();
-    long takeTime = System.currentTimeMillis();
     Assert.assertNotNull(e);
     String s = new String(e.getBody(), Charsets.UTF_8);
     out.add(s);
     LOG.info("Slow take got " + s);
     // sleep so a checkpoint occurs. take is before
     // and commit is after the checkpoint
-    File checkpoint = new File(checkpointDir, "checkpoint");
-    while(checkpoint.lastModified() < takeTime){
-      TimeUnit.MILLISECONDS.sleep(500);
-    }
+    forceCheckpoint(channel);
     tx.commit();
     tx.close();
     channel.stop();
@@ -868,14 +855,7 @@ public class TestFileChannel {
     tx.begin();
     channel.put(EventBuilder.withBody(new byte[]{'c','d'}));
     set.add(new String(new byte[]{'c', 'd'}));
-    File checkpoint = new File(checkpointDir, "checkpoint");
-    long t1 = System.currentTimeMillis();
-    while(checkpoint.lastModified() < t1) {
-      TimeUnit.MILLISECONDS.sleep(500);
-      if (System.currentTimeMillis() - checkpoint.lastModified() > 15000) {
-        throw new TimeoutException("Checkpoint did not happen");
-      }
-    }
+    forceCheckpoint(channel);
     tx.commit();
     tx.close();
     channel.stop();
@@ -916,25 +896,10 @@ public class TestFileChannel {
     tx.begin();
     channel.put(EventBuilder.withBody(new byte[]{'c', 'd'}));
     set.add(new String(new byte[]{'c','d'}));
-    File checkpoint = new File(checkpointDir, "checkpoint");
-    long t1 = System.currentTimeMillis();
-    while (checkpoint.lastModified() < t1) {
-      TimeUnit.MILLISECONDS.sleep(500);
-      if(System.currentTimeMillis() - checkpoint.lastModified() > 15000){
-        throw new TimeoutException("Checkpoint was expected,"
-                + " but did not happen");
-      }
-    }
+    forceCheckpoint(channel);
     tx.commit();
     tx.close();
-    long t2 = System.currentTimeMillis();
-    while(checkpoint.lastModified() < t2){
-      TimeUnit.MILLISECONDS.sleep(500);
-      if (t2 - checkpoint.lastModified() > 15000) {
-        throw new TimeoutException("Checkpoint was expected, "
-                + "but did not happen");
-      }
-    }
+    forceCheckpoint(channel);
     channel.stop();
 
     channel = createFileChannel(overrides);
@@ -953,6 +918,19 @@ public class TestFileChannel {
     channel.stop();
   }
 
+  private static void forceCheckpoint(FileChannel channel) {
+    Log log = field("log")
+        .ofType(Log.class)
+          .in(channel)
+            .get();
+
+    Assert.assertTrue("writeCheckpoint returned false",
+        method("writeCheckpoint")
+          .withReturnType(Boolean.class)
+            .withParameterTypes(Boolean.class)
+              .in(log)
+                .invoke(true));
+  }
   private static void copyDecompressed(String resource, File output)
           throws IOException {
     URL input =  Resources.getResource(resource);


Mime
View raw message