fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [incubator-fluo] branch master updated: fixes #878 made LogIT handle delayed tx close() (#880)
Date Wed, 28 Jun 2017 15:20:29 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-fluo.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a82c28  fixes #878 made LogIT handle delayed tx close() (#880)
8a82c28 is described below

commit 8a82c28302fa1da9dbdfc019bd0679df3ba9bfac
Author: Keith Turner <keith@deenlo.com>
AuthorDate: Wed Jun 28 11:20:27 2017 -0400

    fixes #878 made LogIT handle delayed tx close() (#880)
---
 .../org/apache/fluo/integration/log/LogIT.java     | 179 ++++++++++++---------
 1 file changed, 103 insertions(+), 76 deletions(-)

diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
index fe712fe..2f49b4f 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
@@ -40,6 +40,7 @@ import org.apache.fluo.api.data.Span;
 import org.apache.fluo.api.observer.Observer;
 import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.api.observer.StringObserver;
+import org.apache.fluo.core.util.UtilWaitThread;
 import org.apache.fluo.integration.ITBaseMini;
 import org.apache.fluo.integration.TestUtil;
 import org.apache.log4j.Level;
@@ -56,6 +57,7 @@ public class LogIT extends ITBaseMini {
   private static final Column STAT_COUNT = new Column("stat", "count");
 
   static class SimpleLoader implements Loader {
+    @Override
     public void load(TransactionBase tx, Context context) throws Exception {
       TestUtil.increment(tx, "r1", new Column("a", "b"), 1);
     }
@@ -65,6 +67,7 @@ public class LogIT extends ITBaseMini {
 
     private static final Bytes ROW = Bytes.of(new byte[] {'r', '1', 13});
 
+    @Override
     public void load(TransactionBase tx, Context context) throws Exception {
       TestUtil.increment(tx, ROW, new Column(Bytes.of("a"), Bytes.of(new byte[] {0, 9})),
1);
     }
@@ -262,52 +265,55 @@ public class LogIT extends ITBaseMini {
         Assert.assertTrue(Integer.parseInt(snap.gets("all", STAT_COUNT)) >= 1);
         Assert.assertEquals("1", snap.gets("r1", new Column("a", "b")));
       }
+
+      String logMsgs = writer.toString();
+      logMsgs = logMsgs.replace('\n', ' ');
+
+      String pattern;
+
+      // simple loader should cause this pattern in logs
+      pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
+      pattern += ".*txid: \\1 class: org.apache.fluo.integration.log.LogIT\\$SimpleLoader";
+      pattern += ".*txid: \\1 get\\(r1, a b \\) -> null";
+      pattern += ".*txid: \\1 set\\(r1, a b , 1\\)";
+      pattern += ".*txid: \\1 commit\\(\\) -> SUCCESSFUL commitTs: \\d+";
+      pattern += ".*";
+      Assert.assertTrue(logMsgs.matches(pattern));
+      waitForClose(writer, pattern);
+
+      // trigger loader should cause this pattern in logs
+      pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
+      pattern += ".*txid: \\1 class: org.apache.fluo.integration.log.LogIT\\$TriggerLoader";
+      pattern += ".*txid: \\1 set\\(0, stat count , 1\\)";
+      pattern += ".*txid: \\1 setWeakNotification\\(0, stat count \\)";
+      pattern += ".*txid: \\1 commit\\(\\) -> SUCCESSFUL commitTs: \\d+";
+      pattern += ".*";
+      Assert.assertTrue(logMsgs.matches(pattern));
+      waitForClose(writer, pattern);
+
+      // observer should cause this pattern in logs
+      pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
+      pattern += ".*txid: \\1 trigger: 0 stat count  \\d+";
+      pattern += ".*txid: \\1 class: org.apache.fluo.integration.log.LogIT\\$TestObserver";
+      pattern += ".*txid: \\1 get\\(0, stat count \\) -> 1";
+      pattern += ".*txid: \\1 get\\(all, stat count \\) -> null";
+      pattern += ".*txid: \\1 set\\(all, stat count , 1\\)";
+      pattern += ".*txid: \\1 commit\\(\\) -> SUCCESSFUL commitTs: \\d+";
+      pattern += ".*";
+      Assert.assertTrue(logMsgs.matches(pattern));
+      waitForClose(writer, pattern);
+
+      // two gets done by snapshot should cause this pattern
+      pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
+      pattern += ".*txid: \\1 get\\(all, stat count \\) -> 1";
+      pattern += ".*txid: \\1 get\\(r1, a b \\) -> 1";
+      pattern += ".*txid: \\1 close\\(\\).*";
+      Assert.assertTrue(logMsgs.matches(pattern));
     } finally {
       logger.removeAppender(appender);
       logger.setAdditivity(additivity);
       logger.setLevel(level);
     }
-
-    String logMsgs = writer.toString();
-    logMsgs = logMsgs.replace('\n', ' ');
-
-    String pattern;
-
-    // simple loader should cause this pattern in logs
-    pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
-    pattern += ".*txid: \\1 class: org.apache.fluo.integration.log.LogIT\\$SimpleLoader";
-    pattern += ".*txid: \\1 get\\(r1, a b \\) -> null";
-    pattern += ".*txid: \\1 set\\(r1, a b , 1\\)";
-    pattern += ".*txid: \\1 commit\\(\\) -> SUCCESSFUL commitTs: \\d+";
-    pattern += ".*txid: \\1 close\\(\\).*";
-    Assert.assertTrue(logMsgs.matches(pattern));
-
-    // trigger loader should cause this pattern in logs
-    pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
-    pattern += ".*txid: \\1 class: org.apache.fluo.integration.log.LogIT\\$TriggerLoader";
-    pattern += ".*txid: \\1 set\\(0, stat count , 1\\)";
-    pattern += ".*txid: \\1 setWeakNotification\\(0, stat count \\)";
-    pattern += ".*txid: \\1 commit\\(\\) -> SUCCESSFUL commitTs: \\d+";
-    pattern += ".*txid: \\1 close\\(\\).*";
-    Assert.assertTrue(logMsgs.matches(pattern));
-
-    // observer should cause this pattern in logs
-    pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
-    pattern += ".*txid: \\1 trigger: 0 stat count  \\d+";
-    pattern += ".*txid: \\1 class: org.apache.fluo.integration.log.LogIT\\$TestObserver";
-    pattern += ".*txid: \\1 get\\(0, stat count \\) -> 1";
-    pattern += ".*txid: \\1 get\\(all, stat count \\) -> null";
-    pattern += ".*txid: \\1 set\\(all, stat count , 1\\)";
-    pattern += ".*txid: \\1 commit\\(\\) -> SUCCESSFUL commitTs: \\d+";
-    pattern += ".*txid: \\1 close\\(\\).*";
-    Assert.assertTrue(logMsgs.matches(pattern));
-
-    // two gets done by snapshot should cause this pattern
-    pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
-    pattern += ".*txid: \\1 get\\(all, stat count \\) -> 1";
-    pattern += ".*txid: \\1 get\\(r1, a b \\) -> 1";
-    pattern += ".*txid: \\1 close\\(\\).*";
-    Assert.assertTrue(logMsgs.matches(pattern));
   }
 
   @Test
@@ -374,6 +380,26 @@ public class LogIT extends ITBaseMini {
     Assert.assertTrue(logMsgs.matches(pattern));
   }
 
+  private void waitForClose(StringWriter writer, String pattern) {
+    // This util function was created for test using LoaderExecutors. When a LoaderExecutor
is
+    // closed() all transactions should be committed. However the close may still be proceeding
in
+    // the background. This util function ensure the close is eventually logged.
+    pattern += "txid: \\1 \\Qclose()\\E.*";
+
+    String origLogMsgs = writer.toString();
+    String logMsgs = origLogMsgs.replace('\n', ' ');
+
+    int count = 0;
+    while (!logMsgs.matches(pattern) && count < 30) {
+      UtilWaitThread.sleep(1000);
+      count++;
+      origLogMsgs = writer.toString();
+      logMsgs = origLogMsgs.replace('\n', ' ');
+    }
+
+    Assert.assertTrue(origLogMsgs, logMsgs.matches(pattern));
+  }
+
   @Test
   public void testBinaryLogging() throws Exception {
     Logger logger = Logger.getLogger("fluo.tx");
@@ -395,46 +421,47 @@ public class LogIT extends ITBaseMini {
       }
 
       miniFluo.waitForObservers();
+
+      String origLogMsgs = writer.toString();
+      String logMsgs = origLogMsgs.replace('\n', ' ');
+
+      String pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
+      pattern += ".*txid: \\1 class: org.apache.fluo.integration.log.LogIT\\$BinaryLoader1";
+      pattern += ".*txid: \\1 \\Qdelete(r\\x051, c\\x021 c\\xf51 )\\E";
+      pattern += ".*txid: \\1 \\Qget(r\\x062, c\\x021 c\\xf51 ) -> null\\E";
+      pattern += ".*txid: \\1 \\Qget(r\\x062, [c\\x021 c\\xf51 , c\\x092 c\\xe52 ]) ->
[]\\E";
+      pattern +=
+          ".*txid: \\1 \\Qget([r\\x051, r\\x062], [c\\x021 c\\xf51 , c\\x092 c\\xe52 ]) ->
[]\\E";
+      pattern += ".*txid: \\1 \\Qset(r\\x051, c\\x092 c\\xe52 , v\\x992)\\E";
+      pattern += ".*txid: \\1 \\Qset(r\\x062, c\\x092 c\\xe52 , v\\xd91)\\E";
+      pattern += ".*txid: \\1 \\QsetWeakNotification(r\\x062, c\\x092 c\\xe52 )\\E";
+      pattern += ".*txid: \\1 \\Qcommit()\\E -> SUCCESSFUL commitTs: \\d+";
+      pattern += ".*";
+      Assert.assertTrue(origLogMsgs, logMsgs.matches(pattern));
+      waitForClose(writer, pattern);
+
+      String v1 = "\\Qr\\x051=[c\\x092 c\\xe52 =v\\x992]\\E";
+      String v2 = "\\Qr\\x062=[c\\x092 c\\xe52 =v\\xd91]\\E";
+
+      pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
+      pattern += ".*txid: \\1 \\Qtrigger: r\\x062 c\\x092 c\\xe52  4\\E";
+      pattern += ".*txid: \\1 \\Qclass: org.apache.fluo.integration.log.LogIT$BinaryObserver\\E";
+      pattern += ".*txid: \\1 \\Qget(r\\x051, c\\x092 c\\xe52 ) -> v\\x992\\E";
+      pattern +=
+          ".*txid: \\1 \\Qget(r\\x062, [c\\x021 c\\xf51 , c\\x092 c\\xe52 ]) -> [c\\x092
c\\xe52 =v\\xd91]\\E";
+      pattern +=
+          ".*txid: \\1 \\Qget([r\\x051, r\\x062], [c\\x021 c\\xf51 , c\\x092 c\\xe52 ]) ->
[\\E("
+              + v1 + "|" + v2 + ")\\, (" + v1 + "|" + v2 + ")\\]";
+      pattern += ".*txid: \\1 \\Qcommit() -> SUCCESSFUL commitTs: -1\\E";
+      pattern += ".*";
+      Assert.assertTrue(origLogMsgs, logMsgs.matches(pattern));
+      waitForClose(writer, pattern);
+
     } finally {
       logger.removeAppender(appender);
       logger.setAdditivity(additivity);
       logger.setLevel(level);
     }
-
-    String origLogMsgs = writer.toString();
-    String logMsgs = origLogMsgs.replace('\n', ' ');
-
-    String pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
-    pattern += ".*txid: \\1 class: org.apache.fluo.integration.log.LogIT\\$BinaryLoader1";
-    pattern += ".*txid: \\1 \\Qdelete(r\\x051, c\\x021 c\\xf51 )\\E";
-    pattern += ".*txid: \\1 \\Qget(r\\x062, c\\x021 c\\xf51 ) -> null\\E";
-    pattern += ".*txid: \\1 \\Qget(r\\x062, [c\\x021 c\\xf51 , c\\x092 c\\xe52 ]) -> []\\E";
-    pattern +=
-        ".*txid: \\1 \\Qget([r\\x051, r\\x062], [c\\x021 c\\xf51 , c\\x092 c\\xe52 ]) ->
[]\\E";
-    pattern += ".*txid: \\1 \\Qset(r\\x051, c\\x092 c\\xe52 , v\\x992)\\E";
-    pattern += ".*txid: \\1 \\Qset(r\\x062, c\\x092 c\\xe52 , v\\xd91)\\E";
-    pattern += ".*txid: \\1 \\QsetWeakNotification(r\\x062, c\\x092 c\\xe52 )\\E";
-    pattern += ".*txid: \\1 \\Qcommit()\\E -> SUCCESSFUL commitTs: \\d+";
-    pattern += ".*txid: \\1 \\Qclose()\\E";
-    pattern += ".*";
-    Assert.assertTrue(origLogMsgs, logMsgs.matches(pattern));
-
-    String v1 = "\\Qr\\x051=[c\\x092 c\\xe52 =v\\x992]\\E";
-    String v2 = "\\Qr\\x062=[c\\x092 c\\xe52 =v\\xd91]\\E";
-
-    pattern = ".*txid: (\\d+) begin\\(\\) thread: \\d+";
-    pattern += ".*txid: \\1 \\Qtrigger: r\\x062 c\\x092 c\\xe52  4\\E";
-    pattern += ".*txid: \\1 \\Qclass: org.apache.fluo.integration.log.LogIT$BinaryObserver\\E";
-    pattern += ".*txid: \\1 \\Qget(r\\x051, c\\x092 c\\xe52 ) -> v\\x992\\E";
-    pattern +=
-        ".*txid: \\1 \\Qget(r\\x062, [c\\x021 c\\xf51 , c\\x092 c\\xe52 ]) -> [c\\x092
c\\xe52 =v\\xd91]\\E";
-    pattern +=
-        ".*txid: \\1 \\Qget([r\\x051, r\\x062], [c\\x021 c\\xf51 , c\\x092 c\\xe52 ]) ->
[\\E("
-            + v1 + "|" + v2 + ")\\, (" + v1 + "|" + v2 + ")\\]";
-    pattern += ".*txid: \\1 \\Qcommit() -> SUCCESSFUL commitTs: -1\\E";
-    pattern += ".*txid: \\1 \\Qclose()\\E";
-    pattern += ".*";
-    Assert.assertTrue(origLogMsgs, logMsgs.matches(pattern));
   }
 
   private void assertEqual(CellScanner cs, RowColumnValue... rcvs) {

-- 
To stop receiving notification emails like this one, please contact
['"commits@fluo.apache.org" <commits@fluo.apache.org>'].

Mime
View raw message