activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/3] activemq-artemis git commit: ARTEMIS-1474 TimedBuffer improved doc and refactored dead brenches on methods
Date Mon, 23 Oct 2017 18:53:36 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 9a1291279 -> 0ef2c15ab


ARTEMIS-1474 TimedBuffer improved doc and refactored dead brenches on methods


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f5dfbf7f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f5dfbf7f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f5dfbf7f

Branch: refs/heads/master
Commit: f5dfbf7f12d0373e6b712dd8cd1ce43b8da8dd6a
Parents: 9a12912
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Fri Oct 20 14:38:39 2017 +0200
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Oct 23 14:44:09 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/io/buffer/TimedBuffer.java     | 85 +++++++++++---------
 1 file changed, 48 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f5dfbf7f/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
index 0015dc5..6fc3e6b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
@@ -34,6 +34,9 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 
 public final class TimedBuffer {
+
+   private static final double MAX_TIMEOUT_ERROR_FACTOR = 1.5;
+
    // Constants -----------------------------------------------------
 
    // The number of tries on sleep before switching to spin
@@ -84,9 +87,6 @@ public final class TimedBuffer {
 
    private TimerTask logRatesTimerTask;
 
-   //used only in the timerThread do not synchronization
-   private boolean useSleep = true;
-
    // no need to be volatile as every access is synchronized
    private boolean spinning = false;
 
@@ -269,20 +269,21 @@ public final class TimedBuffer {
    }
 
    public void flush() {
-      flush(false);
+      flushBatch();
    }
 
    /**
-    * force means the Journal is moving to a new file. Any pending write need to be done
immediately
-    * or data could be lost
+    * Attempts to flush if {@code !delayFlush} and {@code buffer} is filled by any data.
+    *
+    * @return {@code true} when are flushed any bytes, {@code false} otherwise
     */
-   public void flush(final boolean force) {
+   public boolean flushBatch() {
       synchronized (this) {
          if (!started) {
             throw new IllegalStateException("TimedBuffer is not started");
          }
 
-         if ((force || !delayFlush) && buffer.writerIndex() > 0) {
+         if (!delayFlush && buffer.writerIndex() > 0) {
             int pos = buffer.writerIndex();
 
             if (logRates) {
@@ -310,6 +311,10 @@ public final class TimedBuffer {
             bufferLimit = 0;
 
             flushesDone.incrementAndGet();
+
+            return pos > 0;
+         } else {
+            return false;
          }
       }
    }
@@ -373,6 +378,7 @@ public final class TimedBuffer {
       @Override
       public void run() {
          long lastFlushTime = System.nanoTime();
+         boolean useSleep = true;
 
          while (!closed) {
             // We flush on the timer if there are pending syncs there and we've waited at
least one
@@ -384,20 +390,24 @@ public final class TimedBuffer {
                if (useSleep) {
                   // if using sleep, we will always flush
                   lastFlushTime = System.nanoTime();
-                  flush();
-
+                  if (flushBatch()) {
+                     //it could wait until the timeout is expired
+                     final long timeFromTheLastFlush = System.nanoTime() - lastFlushTime;
+
+                     // example: Say the device took 20% of the time to write..
+                     //          We only need to wait 80% more..
+                     //          timeFromTheLastFlush would be the difference
+                     //          And if the device took more than that time, there's no need
to wait at all.
+                     final long timeToSleep = timeout - timeFromTheLastFlush;
+                     if (timeToSleep > 0) {
+                        useSleep = sleepIfPossible(timeToSleep);
+                     }
+                  }
                } else if (bufferObserver != null && System.nanoTime() - lastFlushTime
> timeout) {
                   lastFlushTime = System.nanoTime();
                   // if not using flush we will spin and do the time checks manually
                   flush();
                }
-
-            }
-            //it could wait until the timeout is expired
-            final long timeFromTheLastFlush = System.nanoTime() - lastFlushTime;
-            final long timeToSleep = timeout - timeFromTheLastFlush;
-            if (timeToSleep > 0) {
-               sleepIfPossible(timeToSleep);
             }
 
             try {
@@ -413,34 +423,35 @@ public final class TimedBuffer {
       }
 
       /**
-       * We will attempt to use sleep only if the system supports nano-sleep
+       * We will attempt to use sleep only if the system supports nano-sleep.
        * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well.
        * if more than 50% of the checks have failed we will cancel the sleep and just use
regular spin
        */
-      private void sleepIfPossible(long nanosToSleep) {
-         if (useSleep) {
-            try {
-               final long startSleep = System.nanoTime();
-               sleep(nanosToSleep);
+      private boolean sleepIfPossible(long nanosToSleep) {
+         boolean useSleep = true;
+         try {
+            final long startSleep = System.nanoTime();
+            sleep(nanosToSleep);
+            if (checks < MAX_CHECKS_ON_SLEEP) {
                final long elapsedSleep = System.nanoTime() - startSleep;
-               if (checks < MAX_CHECKS_ON_SLEEP) {
-                  // I'm letting the real time to be up to 50% than the requested sleep.
-                  if (elapsedSleep > nanosToSleep * 1.5) {
-                     failedChecks++;
-                  }
+               // I'm letting the real time to be up to 50% than the requested sleep.
+               if (elapsedSleep > (nanosToSleep * MAX_TIMEOUT_ERROR_FACTOR)) {
+                  failedChecks++;
+               }
 
-                  if (++checks >= MAX_CHECKS_ON_SLEEP) {
-                     if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
-                        ActiveMQJournalLogger.LOGGER.debug("LockSupport.parkNanos with nano
seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal
TimedBuffer will spin for timeouts");
-                        useSleep = false;
-                     }
+
+               if (++checks >= MAX_CHECKS_ON_SLEEP) {
+                  if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
+                     ActiveMQJournalLogger.LOGGER.debug("LockSupport.parkNanos with nano
seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal
TimedBuffer will spin for timeouts");
+                     useSleep = false;
                   }
                }
-            } catch (Exception e) {
-               useSleep = false;
-               ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer,
using spin now", e);
             }
+         } catch (Exception e) {
+            useSleep = false;
+            ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer,
using spin now", e);
          }
+         return useSleep;
       }
 
       public void close() {
@@ -486,4 +497,4 @@ public final class TimedBuffer {
       }
    }
 
-}
+}
\ No newline at end of file


Mime
View raw message