activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject activemq-artemis git commit: ARTEMIS-1151 Adapting TimedBuffer and NIO Buffer Pooling
Date Mon, 08 May 2017 22:39:35 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x 5ec545a79 -> 25094f272


ARTEMIS-1151 Adapting TimedBuffer and NIO Buffer Pooling

- NIO/ASYNCIO new TimedBuffer with adapting batch window heuristic
- NIO/ASYNCIO improved TimedBuffer write monitoring with
  lightweight concurrent performance counters
- NIO/ASYNCIO journal/paging operations benefit from less buffer copy
- NIO/ASYNCIO any buffer copy is always performed with raw batch copy
  using SIMD instrinsics (System::arrayCopy) or memcpy under the hood
- NIO improved clear buffers using SIMD instrinsics (Arrays::fill) and/or memset
- NIO journal operation perform by default TLABs allocation pooling (off heap)
  retaining only the last max sized buffer
- NIO improved file copy operations using zero-copy FileChannel::transfertTo
- NIO improved zeroing using pooled single OS page buffer to clean the file
  + pwrite (on Linux)
- NIO deterministic release of unpooled direct buffers to avoid OOM errors
  due to slow GC
- Exposed OS PAGE SIZE value using Env class

(cherry picked from commit 21c9ed85cf6b9a53debdd32747bd42b2e733da80)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/25094f27
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/25094f27
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/25094f27

Branch: refs/heads/1.x
Commit: 25094f27217f8bd561d8d230d2624fd76f059d66
Parents: 5ec545a
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Tue May 2 11:47:44 2017 +0200
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon May 8 18:31:23 2017 -0400

----------------------------------------------------------------------
 .../cli/commands/util/SyncCalculation.java      |   1 +
 .../org/apache/activemq/artemis/utils/Env.java  | 108 ++++++++
 .../artemis/core/io/AbstractSequentialFile.java |  30 +--
 .../artemis/core/io/buffer/TimedBuffer.java     | 252 +++++++++----------
 .../artemis/core/io/nio/NIOSequentialFile.java  | 105 +++++---
 .../core/io/nio/NIOSequentialFileFactory.java   |  82 +++++-
 .../unit/core/journal/impl/TimedBufferTest.java | 160 ------------
 7 files changed, 389 insertions(+), 349 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25094f27/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
index 02db655..f4fbfee 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
@@ -179,6 +179,7 @@ public class SyncCalculation {
 
          case NIO:
             factory = new NIOSequentialFileFactory(datafolder, 1).setDatasync(datasync);
+            ((NIOSequentialFileFactory) factory).disableBufferReuse();
             factory.start();
             return factory;
          case ASYNCIO:

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25094f27/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java
new file mode 100644
index 0000000..94f69d3
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+
+/**
+ * Utility that detects various properties specific to the current runtime
+ * environment, such as JVM bitness and OS type.
+ */
+public final class Env {
+
+   private static final int OS_PAGE_SIZE;
+
+   static {
+      //most common OS page size value
+      int osPageSize = 4096;
+      sun.misc.Unsafe instance;
+      try {
+         Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+         field.setAccessible(true);
+         instance = (sun.misc.Unsafe) field.get((Object) null);
+      } catch (Throwable t) {
+         try {
+            Constructor<sun.misc.Unsafe> c = sun.misc.Unsafe.class.getDeclaredConstructor(new
Class[0]);
+            c.setAccessible(true);
+            instance = c.newInstance(new Object[0]);
+         } catch (Throwable t1) {
+            instance = null;
+         }
+      }
+      if (instance != null) {
+         osPageSize = instance.pageSize();
+      }
+      OS_PAGE_SIZE = osPageSize;
+   }
+
+   /**
+    * The system will change a few logs and semantics to be suitable to
+    * run a long testsuite.
+    * Like a few log entries that are only valid during a production system.
+    * or a few cases we need to know as warn on the testsuite and as log in production.
+    */
+   private static boolean testEnv = false;
+
+   private static final String OS = System.getProperty("os.name").toLowerCase();
+   private static final boolean IS_LINUX = OS.startsWith("linux");
+   private static final boolean IS_64BIT = checkIs64bit();
+
+   private Env() {
+
+   }
+
+   /**
+    * Return the size in bytes of a OS memory page.
+    * This value will always be a power of two.
+    */
+   public static int osPageSize() {
+      return OS_PAGE_SIZE;
+   }
+
+   public static boolean isTestEnv() {
+      return testEnv;
+   }
+
+   public static void setTestEnv(boolean testEnv) {
+      Env.testEnv = testEnv;
+   }
+
+   public static boolean isLinuxOs() {
+      return IS_LINUX == true;
+   }
+
+   public static boolean is64BitJvm() {
+      return IS_64BIT;
+   }
+
+   private static boolean checkIs64bit() {
+      //check the more used JVMs
+      String systemProp;
+      systemProp = System.getProperty("com.ibm.vm.bitmode");
+      if (systemProp != null) {
+         return "64".equals(systemProp);
+      }
+      systemProp = System.getProperty("sun.arch.data.model");
+      if (systemProp != null) {
+         return "64".equals(systemProp);
+      }
+      systemProp = System.getProperty("java.vm.version");
+      return systemProp != null && systemProp.contains("_64");
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25094f27/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
index cd15246..f6cb9b0 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
@@ -189,9 +189,12 @@ public abstract class AbstractSequentialFile implements SequentialFile
{
          bytes.setIndex(0, bytes.capacity());
          timedBuffer.addBytes(bytes, sync, callback);
       } else {
-         ByteBuffer buffer = factory.newBuffer(bytes.capacity());
-         buffer.put(bytes.toByteBuffer().array());
-         buffer.rewind();
+         final int readableBytes = bytes.readableBytes();
+         final ByteBuffer buffer = factory.newBuffer(readableBytes);
+         //factory::newBuffer doesn't necessary return a buffer with limit == readableBytes!!
+         buffer.limit(readableBytes);
+         bytes.getBytes(bytes.readerIndex(), buffer);
+         buffer.flip();
          writeDirect(buffer, sync, callback);
       }
    }
@@ -215,15 +218,12 @@ public abstract class AbstractSequentialFile implements SequentialFile
{
       if (timedBuffer != null) {
          timedBuffer.addBytes(bytes, sync, callback);
       } else {
-         ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize());
-
-         // If not using the TimedBuffer, a final copy is necessary
-         // Because AIO will need a specific Buffer
-         // And NIO will also need a whole buffer to perform the write
-
+         final int encodedSize = bytes.getEncodeSize();
+         ByteBuffer buffer = factory.newBuffer(encodedSize);
          ActiveMQBuffer outBuffer = ActiveMQBuffers.wrappedBuffer(buffer);
          bytes.encode(outBuffer);
-         buffer.rewind();
+         buffer.clear();
+         buffer.limit(encodedSize);
          writeDirect(buffer, sync, callback);
       }
    }
@@ -255,9 +255,10 @@ public abstract class AbstractSequentialFile implements SequentialFile
{
 
       @Override
       public void done() {
-         for (IOCallback callback : delegates) {
+         final int size = delegates.size();
+         for (int i = 0; i < size; i++) {
             try {
-               callback.done();
+               delegates.get(i).done();
             } catch (Throwable e) {
                ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
             }
@@ -266,9 +267,10 @@ public abstract class AbstractSequentialFile implements SequentialFile
{
 
       @Override
       public void onError(final int errorCode, final String errorMessage) {
-         for (IOCallback callback : delegates) {
+         final int size = delegates.size();
+         for (int i = 0; i < size; i++) {
             try {
-               callback.onError(errorCode, errorMessage);
+               delegates.get(i).onError(errorCode, errorMessage);
             } catch (Throwable e) {
                ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25094f27/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
index 91e5e12..32b15fe 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
@@ -18,27 +18,25 @@ package org.apache.activemq.artemis.core.io.buffer;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.LinkedList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
 
+import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 
-public class TimedBuffer {
+public final class TimedBuffer {
    // Constants -----------------------------------------------------
 
-   // The number of tries on sleep before switching to spin
-   public static final int MAX_CHECKS_ON_SLEEP = 20;
-
    // Attributes ----------------------------------------------------
 
    private TimedBufferObserver bufferObserver;
@@ -58,10 +56,9 @@ public class TimedBuffer {
 
    private List<IOCallback> callbacks;
 
-   private volatile int timeout;
+   private final int timeout;
 
-   // used to measure sync requests. When a sync is requested, it shouldn't take more than
timeout to happen
-   private volatile boolean pendingSync = false;
+   private final AtomicLong pendingSyncs = new AtomicLong();
 
    private Thread timerThread;
 
@@ -76,7 +73,7 @@ public class TimedBuffer {
 
    private final boolean logRates;
 
-   private final AtomicLong bytesFlushed = new AtomicLong(0);
+   private long bytesFlushed = 0;
 
    private final AtomicLong flushesDone = new AtomicLong(0);
 
@@ -84,8 +81,6 @@ public class TimedBuffer {
 
    private TimerTask logRatesTimerTask;
 
-   private boolean useSleep = true;
-
    // no need to be volatile as every access is synchronized
    private boolean spinning = false;
 
@@ -104,27 +99,18 @@ public class TimedBuffer {
          logRatesTimer = new Timer(true);
       }
       // Setting the interval for nano-sleeps
-
-      buffer = ActiveMQBuffers.fixedBuffer(bufferSize);
+      //prefer off heap buffer to allow further humongous allocations and reduce GC overhead
+      buffer = new ChannelBufferWrapper(Unpooled.directBuffer(size, size));
 
       buffer.clear();
 
       bufferLimit = 0;
 
-      callbacks = new ArrayList<>();
+      callbacks = null;
 
       this.timeout = timeout;
    }
 
-   // for Debug purposes
-   public synchronized boolean isUseSleep() {
-      return useSleep;
-   }
-
-   public synchronized void setUseSleep(boolean useSleep) {
-      this.useSleep = useSleep;
-   }
-
    public synchronized void start() {
       if (started) {
          return;
@@ -232,7 +218,28 @@ public class TimedBuffer {
    }
 
    public synchronized void addBytes(final ActiveMQBuffer bytes, final boolean sync, final
IOCallback callback) {
-      addBytes(new ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback);
+      if (!started) {
+         throw new IllegalStateException("TimedBuffer is not started");
+      }
+
+      delayFlush = false;
+
+      //it doesn't modify the reader index of bytes as in the original version
+      final int readableBytes = bytes.readableBytes();
+      final int writerIndex = buffer.writerIndex();
+      buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes);
+      buffer.writerIndex(writerIndex + readableBytes);
+
+      if (callbacks == null) {
+         callbacks = new ArrayList<>();
+      }
+      callbacks.add(callback);
+
+      if (sync) {
+         final long currentPendingSyncs = pendingSyncs.get();
+         pendingSyncs.lazySet(currentPendingSyncs + 1);
+         startSpin();
+      }
    }
 
    public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final
IOCallback callback) {
@@ -244,11 +251,14 @@ public class TimedBuffer {
 
       bytes.encode(buffer);
 
+      if (callbacks == null) {
+         callbacks = new ArrayList<>();
+      }
       callbacks.add(callback);
 
       if (sync) {
-         pendingSync = true;
-
+         final long currentPendingSyncs = pendingSyncs.get();
+         pendingSyncs.lazySet(currentPendingSyncs + 1);
          startSpin();
       }
 
@@ -262,45 +272,49 @@ public class TimedBuffer {
     * force means the Journal is moving to a new file. Any pending write need to be done
immediately
     * or data could be lost
     */
-   public void flush(final boolean force) {
+   private void flush(final boolean force) {
       synchronized (this) {
          if (!started) {
             throw new IllegalStateException("TimedBuffer is not started");
          }
 
          if ((force || !delayFlush) && buffer.writerIndex() > 0) {
-            int pos = buffer.writerIndex();
+            final int pos = buffer.writerIndex();
 
-            if (logRates) {
-               bytesFlushed.addAndGet(pos);
-            }
+            final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
+            //bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos
or limit == bufferSize!!
+            bufferToFlush.limit(pos);
+            //perform memcpy under the hood due to the off heap buffer
+            buffer.getBytes(0, bufferToFlush);
 
-            ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
-
-            // Putting a byteArray on a native buffer is much faster, since it will do in
a single native call.
-            // Using bufferToFlush.put(buffer) would make several append calls for each byte
-            // We also transfer the content of this buffer to the native file's buffer
-
-            bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
-
-            bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
+            final List<IOCallback> ioCallbacks = callbacks == null ? Collections.emptyList()
: callbacks;
+            bufferObserver.flushBuffer(bufferToFlush, pendingSyncs.get() > 0, ioCallbacks);
 
             stopSpin();
 
-            pendingSync = false;
+            pendingSyncs.lazySet(0);
 
-            // swap the instance as the previous callback list is being used asynchronously
-            callbacks = new LinkedList<>();
+            callbacks = null;
 
             buffer.clear();
 
             bufferLimit = 0;
 
-            flushesDone.incrementAndGet();
+            if (logRates) {
+               logFlushed(pos);
+            }
          }
       }
    }
 
+   private void logFlushed(int bytes) {
+      this.bytesFlushed += bytes;
+      //more lightweight than XADD if single writer
+      final long currentFlushesDone = flushesDone.get();
+      //flushesDone::lazySet write-Release bytesFlushed
+      flushesDone.lazySet(currentFlushesDone + 1L);
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -324,21 +338,21 @@ public class TimedBuffer {
          if (!closed) {
             long now = System.currentTimeMillis();
 
-            long bytesF = bytesFlushed.get();
-            long flushesD = flushesDone.get();
-
+            final long flushesDone = TimedBuffer.this.flushesDone.get();
+            //flushesDone::get read-Acquire bytesFlushed
+            final long bytesFlushed = TimedBuffer.this.bytesFlushed;
             if (lastExecution != 0) {
-               double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution);
+               final double rate = 1000 * (double) (bytesFlushed - lastBytesFlushed) / (now
- lastExecution);
                ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024)));
-               double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution);
+               final double flushRate = 1000 * (double) (flushesDone - lastFlushesDone) /
(now - lastExecution);
                ActiveMQJournalLogger.LOGGER.flushRate(flushRate);
             }
 
             lastExecution = now;
 
-            lastBytesFlushed = bytesF;
+            lastBytesFlushed = bytesFlushed;
 
-            lastFlushesDone = flushesD;
+            lastFlushesDone = flushesDone;
          }
       }
 
@@ -354,84 +368,40 @@ public class TimedBuffer {
 
       private volatile boolean closed = false;
 
-      int checks = 0;
-      int failedChecks = 0;
-      long timeBefore = 0;
-
-      final int sleepMillis = timeout / 1000000; // truncates
-      final int sleepNanos = timeout % 1000000;
-
       @Override
       public void run() {
+         int waitTimes = 0;
          long lastFlushTime = 0;
+         long estimatedOptimalBatch = Runtime.getRuntime().availableProcessors();
+         final Semaphore spinLimiter = TimedBuffer.this.spinLimiter;
+         final long timeout = TimedBuffer.this.timeout;
 
          while (!closed) {
-            // We flush on the timer if there are pending syncs there and we've waited at
least one
-            // timeout since the time of the last flush.
-            // Effectively flushing "resets" the timer
-            // On the timeout verification, notice that we ignore the timeout check if we
are using sleep
-
-            if (pendingSync) {
-               if (isUseSleep()) {
-                  // if using sleep, we will always flush
-                  flush();
-                  lastFlushTime = System.nanoTime();
-               } else if (bufferObserver != null && System.nanoTime() > lastFlushTime
+ timeout) {
-                  // if not using flush we will spin and do the time checks manually
-                  flush();
-                  lastFlushTime = System.nanoTime();
+            boolean flushed = false;
+            final long currentPendingSyncs = pendingSyncs.get();
+
+            if (currentPendingSyncs > 0) {
+               if (bufferObserver != null) {
+                  final boolean checkpoint = System.nanoTime() > lastFlushTime + timeout;
+                  if (checkpoint || currentPendingSyncs >= estimatedOptimalBatch) {
+                     flush();
+                     if (checkpoint) {
+                        estimatedOptimalBatch = currentPendingSyncs;
+                     } else {
+                        estimatedOptimalBatch = Math.max(estimatedOptimalBatch, currentPendingSyncs);
+                     }
+                     lastFlushTime = System.nanoTime();
+                     //a flush has been requested
+                     flushed = true;
+                  }
                }
-
             }
 
-            sleepIfPossible();
-
-            try {
-               spinLimiter.acquire();
-
-               Thread.yield();
-
-               spinLimiter.release();
-            } catch (InterruptedException e) {
-               throw new ActiveMQInterruptedException(e);
-            }
-         }
-      }
-
-      /**
-       * We will attempt to use sleep only if the system supports nano-sleep
-       * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well.
-       * if more than 50% of the checks have failed we will cancel the sleep and just use
regular spin
-       */
-      private void sleepIfPossible() {
-         if (isUseSleep()) {
-            if (checks < MAX_CHECKS_ON_SLEEP) {
-               timeBefore = System.nanoTime();
-            }
-
-            try {
-               sleep(sleepMillis, sleepNanos);
-            } catch (InterruptedException e) {
-               throw new ActiveMQInterruptedException(e);
-            } catch (Exception e) {
-               setUseSleep(false);
-               ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer,
using spin now", e);
-            }
-
-            if (checks < MAX_CHECKS_ON_SLEEP) {
-               long realTimeSleep = System.nanoTime() - timeBefore;
-
-               // I'm letting the real time to be up to 50% than the requested sleep.
-               if (realTimeSleep > timeout * 1.5) {
-                  failedChecks++;
-               }
-
-               if (++checks >= MAX_CHECKS_ON_SLEEP) {
-                  if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
-                     ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is
not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer
will spin for timeouts");
-                     setUseSleep(false);
-                  }
-               }
+            if (flushed) {
+               waitTimes = 0;
+            } else {
+               //instead of interruptible sleeping, perform progressive parks depending on
the load
+               waitTimes = TimedBuffer.wait(waitTimes, spinLimiter);
             }
          }
       }
@@ -441,15 +411,33 @@ public class TimedBuffer {
       }
    }
 
-   /**
-    * Sub classes (tests basically) can use this to override how the sleep is being done
-    *
-    * @param sleepMillis
-    * @param sleepNanos
-    * @throws InterruptedException
-    */
-   protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException {
-      Thread.sleep(sleepMillis, sleepNanos);
+   private static int wait(int waitTimes, Semaphore spinLimiter) {
+      if (waitTimes < 10) {
+         //doesn't make sense to spin loop here, because of the lock around flush/addBytes
operations!
+         Thread.yield();
+         waitTimes++;
+      } else if (waitTimes < 20) {
+         LockSupport.parkNanos(1L);
+         waitTimes++;
+      } else if (waitTimes < 50) {
+         LockSupport.parkNanos(10L);
+         waitTimes++;
+      } else if (waitTimes < 100) {
+         LockSupport.parkNanos(100L);
+         waitTimes++;
+      } else if (waitTimes < 1000) {
+         LockSupport.parkNanos(1000L);
+         waitTimes++;
+      } else {
+         LockSupport.parkNanos(100_000L);
+         try {
+            spinLimiter.acquire();
+            spinLimiter.release();
+         } catch (InterruptedException e) {
+            throw new ActiveMQInterruptedException(e);
+         }
+      }
+      return waitTimes;
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25094f27/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
index 29e5b81..d1e333e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
@@ -22,6 +22,7 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -33,6 +34,8 @@ import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.apache.activemq.artemis.utils.Env;
 
 public final class NIOSequentialFile extends AbstractSequentialFile {
 
@@ -40,9 +43,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
 
    private RandomAccessFile rfile;
 
-   private final int defaultMaxIO;
-
-   private int maxIO;
+   private final int maxIO;
 
    public NIOSequentialFile(final SequentialFileFactory factory,
                             final File directory,
@@ -50,7 +51,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
                             final int maxIO,
                             final Executor writerExecutor) {
       super(directory, file, factory, writerExecutor);
-      defaultMaxIO = maxIO;
+      this.maxIO = maxIO;
    }
 
    @Override
@@ -69,7 +70,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
     */
    @Override
    public synchronized void open() throws IOException {
-      open(defaultMaxIO, true);
+      open(maxIO, true);
    }
 
    @Override
@@ -90,31 +91,38 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
 
    @Override
    public void fill(final int size) throws IOException {
-      ByteBuffer bb = ByteBuffer.allocate(size);
-
-      bb.limit(size);
-      bb.position(0);
-
       try {
-         channel.position(0);
-         channel.write(bb);
-         channel.force(false);
-         channel.position(0);
+         //uses the most common OS page size to match the Page Cache entry size and reduce
JVM memory footprint
+         final int zeroPageCapacity = Env.osPageSize();
+         final ByteBuffer zeroPage = this.factory.newBuffer(zeroPageCapacity);
+         try {
+            int bytesToWrite = size;
+            long writePosition = 0;
+            while (bytesToWrite > 0) {
+               zeroPage.clear();
+               final int zeroPageLimit = Math.min(bytesToWrite, zeroPageCapacity);
+               zeroPage.limit(zeroPageLimit);
+               //use the cheaper pwrite instead of fseek + fwrite
+               final int writtenBytes = channel.write(zeroPage, writePosition);
+               bytesToWrite -= writtenBytes;
+               writePosition += writtenBytes;
+            }
+            if (factory.isDatasync()) {
+               channel.force(true);
+            }
+            //set the position to 0 to match the fill contract
+            channel.position(0);
+            fileSize = size;
+         } finally {
+            //return it to the factory
+            this.factory.releaseBuffer(zeroPage);
+         }
       } catch (ClosedChannelException e) {
          throw e;
       } catch (IOException e) {
          factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(),
this);
          throw e;
       }
-      channel.force(true);
-
-      fileSize = channel.size();
-   }
-
-   public synchronized void waitForClose() throws InterruptedException {
-      while (isOpen()) {
-         wait();
-      }
    }
 
    @Override
@@ -247,10 +255,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
       internalWrite(bytes, sync, null);
    }
 
-   public void writeInternal(final ByteBuffer bytes) throws Exception {
-      internalWrite(bytes, true, null);
-   }
-
    @Override
    protected ByteBuffer newBuffer(int size, final int limit) {
       // For NIO, we don't need to allocate a buffer the entire size of the timed buffer,
unlike AIO
@@ -293,14 +297,51 @@ public final class NIOSequentialFile extends AbstractSequentialFile
{
    private void doInternalWrite(final ByteBuffer bytes,
                                 final boolean sync,
                                 final IOCallback callback) throws IOException {
-      channel.write(bytes);
+      try {
+         channel.write(bytes);
 
-      if (sync) {
-         sync();
+         if (sync) {
+            sync();
+         }
+
+         if (callback != null) {
+            callback.done();
+         }
+      } finally {
+         //release it to recycle the write buffer if big enough
+         this.factory.releaseBuffer(bytes);
       }
+   }
 
-      if (callback != null) {
-         callback.done();
+   @Override
+   public void copyTo(SequentialFile dstFile) throws IOException {
+      if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) {
+         ActiveMQJournalLogger.LOGGER.debug("Copying " + this + " as " + dstFile);
+      }
+      if (isOpen()) {
+         throw new IllegalStateException("File opened!");
+      }
+      if (dstFile.isOpen()) {
+         throw new IllegalArgumentException("dstFile must be closed too");
+      }
+      try (RandomAccessFile src = new RandomAccessFile(getFile(), "rw");
+           FileChannel srcChannel = src.getChannel();
+           FileLock srcLock = srcChannel.lock()) {
+         final long readableBytes = srcChannel.size();
+         if (readableBytes > 0) {
+            try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw");
+                 FileChannel dstChannel = dst.getChannel();
+                 FileLock dstLock = dstChannel.lock()) {
+               final long oldLength = dst.length();
+               final long newLength = oldLength + readableBytes;
+               dst.setLength(newLength);
+               final long transferred = dstChannel.transferFrom(srcChannel, oldLength, readableBytes);
+               if (transferred != readableBytes) {
+                  dstChannel.truncate(oldLength);
+                  throw new IOException("copied less then expected");
+               }
+            }
+         }
       }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25094f27/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
index f90bebf..781176e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
@@ -19,13 +19,23 @@ package org.apache.activemq.artemis.core.io.nio;
 import java.io.File;
 import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
+import io.netty.util.internal.PlatformDependent;
 import org.apache.activemq.artemis.ArtemisConstants;
 import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.utils.Env;
 
-public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
+public final class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
+
+   private static final int DEFAULT_CAPACITY_ALIGNMENT = Env.osPageSize();
+
+   private boolean bufferPooling;
+
+   //pools only the biggest one -> optimized for the common case
+   private final ThreadLocal<ByteBuffer> bytesPool;
 
    public NIOSequentialFileFactory(final File journalDir, final int maxIO) {
       this(journalDir, null, maxIO);
@@ -63,6 +73,8 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory
{
                                    final boolean logRates,
                                    final IOCriticalErrorListener listener) {
       super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener);
+      this.bufferPooling = true;
+      this.bytesPool = new ThreadLocal<>();
    }
 
    public static ByteBuffer allocateDirectByteBuffer(final int size) {
@@ -91,6 +103,14 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory
{
       return buffer2;
    }
 
+   public void enableBufferReuse() {
+      this.bufferPooling = true;
+   }
+
+   public void disableBufferReuse() {
+      this.bufferPooling = false;
+   }
+
    @Override
    public SequentialFile createSequentialFile(final String fileName) {
       return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
@@ -101,31 +121,71 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory
{
       return timedBuffer != null;
    }
 
+   private static int align(final int value, final int pow2alignment) {
+      return (value + (pow2alignment - 1)) & ~(pow2alignment - 1);
+   }
+
    @Override
    public ByteBuffer allocateDirectBuffer(final int size) {
-      return NIOSequentialFileFactory.allocateDirectByteBuffer(size);
+      final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT);
+      final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
+      byteBuffer.limit(size);
+      return byteBuffer;
    }
 
    @Override
    public void releaseDirectBuffer(ByteBuffer buffer) {
-      // nothing we can do on this case. we can just have good faith on GC
+      PlatformDependent.freeDirectBuffer(buffer);
    }
 
    @Override
    public ByteBuffer newBuffer(final int size) {
-      return ByteBuffer.allocate(size);
+      if (!this.bufferPooling) {
+         return allocateDirectBuffer(size);
+      } else {
+         final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT);
+         ByteBuffer byteBuffer = bytesPool.get();
+         if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
+            //do not free the old one (if any) until the new one will be released into the
pool!
+            byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
+         } else {
+            bytesPool.set(null);
+            PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer),
size, (byte) 0);
+            byteBuffer.clear();
+         }
+         byteBuffer.limit(size);
+         return byteBuffer;
+      }
    }
 
    @Override
-   public void clearBuffer(final ByteBuffer buffer) {
-      final int limit = buffer.limit();
-      buffer.rewind();
-
-      for (int i = 0; i < limit; i++) {
-         buffer.put((byte) 0);
+   public void releaseBuffer(ByteBuffer buffer) {
+      if (this.bufferPooling) {
+         if (buffer.isDirect()) {
+            final ByteBuffer byteBuffer = bytesPool.get();
+            if (byteBuffer != buffer) {
+               //replace with the current pooled only if greater or null
+               if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) {
+                  if (byteBuffer != null) {
+                     //free the smaller one
+                     PlatformDependent.freeDirectBuffer(byteBuffer);
+                  }
+                  bytesPool.set(buffer);
+               } else {
+                  PlatformDependent.freeDirectBuffer(buffer);
+               }
+            }
+         }
       }
+   }
 
-      buffer.rewind();
+   @Override
+   public void clearBuffer(final ByteBuffer buffer) {
+      if (buffer.isDirect()) {
+         PlatformDependent.setMemory(PlatformDependent.directBufferAddress(buffer), buffer.limit(),
(byte) 0);
+      } else {
+         Arrays.fill(buffer.array(), buffer.arrayOffset(), buffer.limit(), (byte) 0);
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25094f27/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
index 31cb970..b2f65cd 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
@@ -19,8 +19,6 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -205,162 +203,4 @@ public class TimedBufferTest extends ActiveMQTestBase {
       }
 
    }
-
-   /**
-    * This test will verify if the system will switch to spin case the system can't perform
sleeps timely
-    * due to proper kernel installations
-    *
-    * @throws Exception
-    */
-   @Test
-   public void testVerifySwitchToSpin() throws Exception {
-      class TestObserver implements TimedBufferObserver {
-
-         @Override
-         public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback>
callbacks) {
-         }
-
-         /* (non-Javadoc)
-          * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int,
int)
-          */
-         @Override
-         public ByteBuffer newBuffer(final int minSize, final int maxSize) {
-            return ByteBuffer.allocate(maxSize);
-         }
-
-         @Override
-         public int getRemainingBytes() {
-            return 1024 * 1024;
-         }
-      }
-
-      final CountDownLatch sleptLatch = new CountDownLatch(1);
-
-      TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS
/ 1000, false) {
-
-         @Override
-         protected void stopSpin() {
-            // keeps spinning forever
-         }
-
-         @Override
-         protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException
{
-            Thread.sleep(10);
-         }
-
-         @Override
-         public synchronized void setUseSleep(boolean param) {
-            super.setUseSleep(param);
-            sleptLatch.countDown();
-         }
-
-      };
-
-      timedBuffer.start();
-
-      try {
-
-         timedBuffer.setObserver(new TestObserver());
-
-         int x = 0;
-
-         byte[] bytes = new byte[10];
-         for (int j = 0; j < 10; j++) {
-            bytes[j] = ActiveMQTestBase.getSamplebyte(x++);
-         }
-
-         ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes);
-
-         timedBuffer.checkSize(10);
-         timedBuffer.addBytes(buff, true, dummyCallback);
-
-         sleptLatch.await(10, TimeUnit.SECONDS);
-
-         assertFalse(timedBuffer.isUseSleep());
-      } finally {
-         timedBuffer.stop();
-      }
-
-   }
-
-   /**
-    * This test will verify if the system will switch to spin case the system can't perform
sleeps timely
-    * due to proper kernel installations
-    *
-    * @throws Exception
-    */
-   @Test
-   public void testStillSleeps() throws Exception {
-      class TestObserver implements TimedBufferObserver {
-
-         @Override
-         public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback>
callbacks) {
-         }
-
-         /* (non-Javadoc)
-          * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int,
int)
-          */
-         @Override
-         public ByteBuffer newBuffer(final int minSize, final int maxSize) {
-            return ByteBuffer.allocate(maxSize);
-         }
-
-         @Override
-         public int getRemainingBytes() {
-            return 1024 * 1024;
-         }
-      }
-
-      final CountDownLatch sleptLatch = new CountDownLatch(TimedBuffer.MAX_CHECKS_ON_SLEEP);
-
-      TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS
/ 1000, false) {
-
-         @Override
-         protected void stopSpin() {
-            // keeps spinning forever
-         }
-
-         @Override
-         protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException
{
-            sleptLatch.countDown();
-            // no sleep
-         }
-
-         @Override
-         public synchronized void setUseSleep(boolean param) {
-            super.setUseSleep(param);
-            sleptLatch.countDown();
-         }
-
-      };
-
-      timedBuffer.start();
-
-      try {
-
-         timedBuffer.setObserver(new TestObserver());
-
-         int x = 0;
-
-         byte[] bytes = new byte[10];
-         for (int j = 0; j < 10; j++) {
-            bytes[j] = ActiveMQTestBase.getSamplebyte(x++);
-         }
-
-         ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes);
-
-         timedBuffer.checkSize(10);
-         timedBuffer.addBytes(buff, true, dummyCallback);
-
-         // waits all the sleeps to be done
-         sleptLatch.await(10, TimeUnit.SECONDS);
-
-         // keeps waiting a bit longer
-         Thread.sleep(100);
-
-         assertTrue(timedBuffer.isUseSleep());
-      } finally {
-         timedBuffer.stop();
-      }
-   }
 }


Mime
View raw message