activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-1266 Mapped Journal refactoring
Date Sat, 01 Jul 2017 01:21:09 GMT
ARTEMIS-1266 Mapped Journal refactoring

The MAPPED journal refactoring include:
 - simplified lifecycle and logic (eg fixed file size with single mmap memory region)
 - supports for the TimedBuffer to coalesce msyncs (via Decorator pattern)
 - TLAB pooling of direct ByteBuffer like the NIO journal
 - remove of old benchmarks and benchmark dependencies


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7075e2e4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7075e2e4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7075e2e4

Branch: refs/heads/master
Commit: 7075e2e457d6d87f6a870da07658d47d5d29d461
Parents: 43dcc57
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Thu Jun 29 17:58:54 2017 +0200
Committer: Francesco Nigro <nigro.fra@gmail.com>
Committed: Fri Jun 30 16:17:19 2017 +0200

----------------------------------------------------------------------
 .../cli/commands/util/SyncCalculation.java      |  11 +-
 .../artemis/core/io/mapped/BytesUtils.java      |  26 ++-
 .../core/io/mapped/MappedByteBufferCache.java   | 224 -------------------
 .../artemis/core/io/mapped/MappedFile.java      | 184 ++++++++-------
 .../core/io/mapped/MappedSequentialFile.java    |  74 +++---
 .../io/mapped/MappedSequentialFileFactory.java  | 131 +++++++----
 .../core/io/mapped/TimedSequentialFile.java     | 164 ++++++--------
 .../artemis/core/io/JournalTptBenchmark.java    |  17 +-
 .../core/io/SequentialFileTptBenchmark.java     |  21 +-
 .../impl/journal/JournalStorageManager.java     |   3 +-
 tests/extra-tests/pom.xml                       |  28 ---
 .../journal/JournalImplLatencyBench.java        | 153 -------------
 .../journal/gcfree/AddJournalRecordEncoder.java | 105 ---------
 .../journal/gcfree/EncodersBench.java           | 114 ----------
 .../journal/gcfree/GcFreeJournal.java           |  80 -------
 .../gcfree/GcFreeJournalLatencyBench.java       | 131 -----------
 .../journal/gcfree/JournalRecordHeader.java     |  27 ---
 .../journal/gcfree/JournalRecordTypes.java      |  29 ---
 .../SequentialFileLatencyBench.java             | 128 -----------
 .../journal/MappedImportExportTest.java         |  20 +-
 .../journal/MappedJournalCompactTest.java       |  20 +-
 .../journal/MappedJournalImplTest.java          |  20 +-
 .../MappedSequentialFileFactoryTest.java        |   4 +-
 .../journal/ValidateTransactionHealthTest.java  |   9 +-
 24 files changed, 391 insertions(+), 1332 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
index 25c8b27..860bcb6 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
@@ -24,7 +24,6 @@ import java.text.DecimalFormat;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
@@ -190,13 +189,9 @@ public class SyncCalculation {
             ((AIOSequentialFileFactory) factory).disableBufferReuse();
             return factory;
          case MAPPED:
-            factory = new MappedSequentialFileFactory(datafolder, new IOCriticalErrorListener() {
-               @Override
-               public void onIOException(Throwable code, String message, SequentialFile file) {
-
-               }
-            }, true).chunkBytes(fileSize).overlapBytes(0).setDatasync(datasync);
-
+            factory = MappedSequentialFileFactory.unbuffered(datafolder, fileSize, null)
+               .setDatasync(datasync)
+               .disableBufferReuse();
             factory.start();
             return factory;
          default:

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java
index 3ff723f..986b698 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java
@@ -29,8 +29,32 @@ final class BytesUtils {
       return (value + (alignment - 1)) & ~(alignment - 1);
    }
 
+   /**
+    * Is a value a positive power of two.
+    *
+    * @param value to be checked.
+    * @return true if the number is a positive power of two otherwise false.
+    */
+   public static boolean isPowOf2(final int value) {
+      return Integer.bitCount(value) == 1;
+   }
+
+   /**
+    * Test if a value is pow2alignment-aligned.
+    *
+    * @param value         to be tested.
+    * @param pow2alignment boundary the address is tested against.
+    * @return true if the address is on the aligned boundary otherwise false.
+    * @throws IllegalArgumentException if the alignment is not a power of 2
+    */
+   public static boolean isAligned(final long value, final int pow2alignment) {
+      if (!isPowOf2(pow2alignment)) {
+         throw new IllegalArgumentException("Alignment must be a power of 2");
+      }
+      return (value & (pow2alignment - 1)) == 0;
+   }
+
    public static void zerosDirect(final ByteBuffer buffer) {
-      //TODO When PlatformDependent will be replaced by VarHandle or Unsafe, replace with safepoint-fixed setMemory
       //DANGEROUS!! erases bound-checking using directly addresses -> safe only if it use counted loops
       int remaining = buffer.capacity();
       long address = PlatformDependent.directBufferAddress(buffer);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedByteBufferCache.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedByteBufferCache.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedByteBufferCache.java
deleted file mode 100644
index 73384c8..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedByteBufferCache.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.io.mapped;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.lang.ref.WeakReference;
-import java.nio.ByteOrder;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.util.ArrayList;
-
-import io.netty.util.internal.PlatformDependent;
-
-final class MappedByteBufferCache implements AutoCloseable {
-
-   public static final int PAGE_SIZE = Integer.parseInt(System.getProperty("os_page_size", "4096"));
-   private static final Object FILE_LOCK = new Object();
-   private final RandomAccessFile raf;
-   private final FileChannel fileChannel;
-   private final long chunkBytes;
-   private final long overlapBytes;
-   private final ArrayList<WeakReference<MappedByteBuffer>> byteBuffers;
-   private final File file;
-   private final long mappedSize;
-   private boolean closed;
-
-   private MappedByteBufferCache(File file, RandomAccessFile raf, long chunkBytes, long overlapBytes, long alignment) {
-      this.byteBuffers = new ArrayList<>();
-      this.file = file;
-      this.raf = raf;
-      this.fileChannel = raf.getChannel();
-      this.chunkBytes = BytesUtils.align(chunkBytes, alignment);
-      this.overlapBytes = BytesUtils.align(overlapBytes, alignment);
-      this.closed = false;
-      this.mappedSize = this.chunkBytes + this.overlapBytes;
-   }
-
-   public static MappedByteBufferCache of(File file, long chunkSize, long overlapSize) throws FileNotFoundException {
-      final RandomAccessFile raf = new RandomAccessFile(file, "rw");
-      return new MappedByteBufferCache(file, raf, chunkSize, overlapSize, PAGE_SIZE);
-   }
-
-   public static boolean inside(long position, long mappedPosition, long mappedLimit) {
-      return mappedPosition <= position && position < mappedLimit;
-   }
-
-   public File file() {
-      return file;
-   }
-
-   public long chunkBytes() {
-      return chunkBytes;
-   }
-
-   public long overlapBytes() {
-      return overlapBytes;
-   }
-
-   public int indexFor(long position) {
-      final int chunk = (int) (position / chunkBytes);
-      return chunk;
-   }
-
-   public long mappedPositionFor(int index) {
-      return index * chunkBytes;
-   }
-
-   public long mappedLimitFor(long mappedPosition) {
-      return mappedPosition + chunkBytes;
-   }
-
-   public MappedByteBuffer acquireMappedByteBuffer(final int index) throws IOException, IllegalArgumentException, IllegalStateException {
-      if (closed)
-         throw new IOException("Closed");
-      if (index < 0)
-         throw new IOException("Attempt to access a negative index: " + index);
-      while (byteBuffers.size() <= index) {
-         byteBuffers.add(null);
-      }
-      final WeakReference<MappedByteBuffer> mbbRef = byteBuffers.get(index);
-      if (mbbRef != null) {
-         final MappedByteBuffer mbb = mbbRef.get();
-         if (mbb != null) {
-            return mbb;
-         }
-      }
-      return mapAndAcquire(index);
-   }
-
-   //METHOD BUILT TO SEPARATE THE SLOW PATH TO ENSURE INLINING OF THE MOST OCCURRING CASE
-   private MappedByteBuffer mapAndAcquire(final int index) throws IOException {
-      final long chunkStartPosition = mappedPositionFor(index);
-      final long minSize = chunkStartPosition + mappedSize;
-      if (fileChannel.size() < minSize) {
-         try {
-            synchronized (FILE_LOCK) {
-               try (FileLock lock = fileChannel.lock()) {
-                  final long size = fileChannel.size();
-                  if (size < minSize) {
-                     raf.setLength(minSize);
-                  }
-               }
-            }
-         } catch (IOException ioe) {
-            throw new IOException("Failed to resize to " + minSize, ioe);
-         }
-      }
-
-      final MappedByteBuffer mbb = fileChannel.map(FileChannel.MapMode.READ_WRITE, chunkStartPosition, mappedSize);
-      mbb.order(ByteOrder.nativeOrder());
-      byteBuffers.set(index, new WeakReference<>(mbb));
-      return mbb;
-   }
-
-   public long fileSize() throws IOException {
-      if (closed)
-         throw new IllegalStateException("Closed");
-      return fileChannel.size();
-   }
-
-   public void closeAndResize(long length) {
-      if (!closed) {
-         //TO_FIX: unmap in this way is not portable BUT required on Windows that can't resize a memmory mapped file!
-         for (final WeakReference<MappedByteBuffer> mbbRef : this.byteBuffers) {
-            if (mbbRef != null) {
-               final MappedByteBuffer mbb = mbbRef.get();
-               if (mbb != null) {
-                  try {
-                     PlatformDependent.freeDirectBuffer(mbb);
-                  } catch (Throwable t) {
-                     //TO_FIX: force releasing of the other buffers
-                  }
-               }
-            }
-         }
-         this.byteBuffers.clear();
-         try {
-            if (fileChannel.size() != length) {
-               try {
-                  synchronized (FILE_LOCK) {
-                     try (FileLock lock = fileChannel.lock()) {
-                        final long size = fileChannel.size();
-                        if (size != length) {
-                           raf.setLength(length);
-                        }
-                     }
-                  }
-               } catch (IOException ioe) {
-                  throw new IllegalStateException("Failed to resize to " + length, ioe);
-               }
-            }
-         } catch (IOException ex) {
-            throw new IllegalStateException("Failed to get size", ex);
-         } finally {
-            try {
-               fileChannel.close();
-            } catch (IOException e) {
-               throw new IllegalStateException("Failed to close channel", e);
-            } finally {
-               try {
-                  raf.close();
-               } catch (IOException e) {
-                  throw new IllegalStateException("Failed to close RandomAccessFile", e);
-               }
-            }
-            closed = true;
-         }
-      }
-   }
-
-   public boolean isClosed() {
-      return closed;
-   }
-
-   @Override
-   public void close() {
-      if (!closed) {
-         //TO_FIX: unmap in this way is not portable BUT required on Windows that can't resize a memory mapped file!
-         for (final WeakReference<MappedByteBuffer> mbbRef : this.byteBuffers) {
-            if (mbbRef != null) {
-               final MappedByteBuffer mbb = mbbRef.get();
-               if (mbb != null) {
-                  try {
-                     PlatformDependent.freeDirectBuffer(mbb);
-                  } catch (Throwable t) {
-                     //TO_FIX: force releasing of the other buffers
-                  }
-               }
-            }
-         }
-         this.byteBuffers.clear();
-         try {
-            fileChannel.close();
-         } catch (IOException e) {
-            throw new IllegalStateException("Failed to close channel", e);
-         } finally {
-            try {
-               raf.close();
-            } catch (IOException e) {
-               throw new IllegalStateException("Failed to close RandomAccessFile", e);
-            }
-         }
-         closed = true;
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
index adfc4fe..eb39320 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
@@ -18,79 +18,66 @@ package org.apache.activemq.artemis.core.io.mapped;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledUnsafeDirectByteBufWrapper;
 import io.netty.util.internal.PlatformDependent;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.utils.Env;
 
 final class MappedFile implements AutoCloseable {
 
-   private final MappedByteBufferCache cache;
+   private static final int OS_PAGE_SIZE = Env.osPageSize();
+   private final MappedByteBuffer buffer;
+   private final FileChannel channel;
+   private final long address;
    private final UnpooledUnsafeDirectByteBufWrapper byteBufWrapper;
    private final ChannelBufferWrapper channelBufferWrapper;
-   private MappedByteBuffer lastMapped;
-   private long lastMappedStart;
-   private long lastMappedLimit;
-   private long position;
-   private long length;
+   private int position;
+   private int length;
 
-   private MappedFile(MappedByteBufferCache cache) throws IOException {
-      this.cache = cache;
-      this.lastMapped = null;
-      this.lastMappedStart = -1;
-      this.lastMappedLimit = -1;
-      this.position = 0;
-      this.length = this.cache.fileSize();
+   private MappedFile(FileChannel channel, MappedByteBuffer byteBuffer, int position, int length) throws IOException {
+      this.channel = channel;
+      this.buffer = byteBuffer;
+      this.position = position;
+      this.length = length;
       this.byteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper();
       this.channelBufferWrapper = new ChannelBufferWrapper(this.byteBufWrapper, false);
+      this.address = PlatformDependent.directBufferAddress(buffer);
    }
 
-   public static MappedFile of(File file, long chunckSize, long overlapSize) throws IOException {
-      return new MappedFile(MappedByteBufferCache.of(file, chunckSize, overlapSize));
+   public static MappedFile of(File file, int position, int capacity) throws IOException {
+      final MappedByteBuffer buffer;
+      final int length;
+      final FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);
+      length = (int) channel.size();
+      if (length != capacity && length != 0) {
+         channel.close();
+         throw new IllegalStateException("the file is not " + capacity + " bytes long!");
+      }
+      buffer = channel.map(FileChannel.MapMode.READ_WRITE, position, capacity);
+      return new MappedFile(channel, buffer, 0, length);
    }
 
-   public MappedByteBufferCache cache() {
-      return cache;
+   public FileChannel channel() {
+      return channel;
    }
 
-   private int checkOffset(long offset, int bytes) throws BufferUnderflowException, IOException {
-      if (!MappedByteBufferCache.inside(offset, lastMappedStart, lastMappedLimit)) {
-         return updateOffset(offset, bytes);
-      } else {
-         final int bufferPosition = (int) (offset - lastMappedStart);
-         return bufferPosition;
-      }
+   public MappedByteBuffer mapped() {
+      return buffer;
    }
 
-   private int updateOffset(long offset, int bytes) throws BufferUnderflowException, IOException {
-      try {
-         final int index = cache.indexFor(offset);
-         final long mappedPosition = cache.mappedPositionFor(index);
-         final long mappedLimit = cache.mappedLimitFor(mappedPosition);
-         if (offset + bytes > mappedLimit) {
-            throw new IOException("mapping overflow!");
-         }
-         lastMapped = cache.acquireMappedByteBuffer(index);
-         lastMappedStart = mappedPosition;
-         lastMappedLimit = mappedLimit;
-         final int bufferPosition = (int) (offset - mappedPosition);
-         return bufferPosition;
-      } catch (IllegalStateException e) {
-         throw new IOException(e);
-      } catch (IllegalArgumentException e) {
-         throw new BufferUnderflowException();
-      }
+   public long address() {
+      return this.address;
    }
 
    public void force() {
-      if (lastMapped != null) {
-         lastMapped.force();
-      }
+      this.buffer.force();
    }
 
    /**
@@ -98,9 +85,8 @@ final class MappedFile implements AutoCloseable {
     * <p>
     * <p> Bytes are read starting at this file's specified position.
     */
-   public int read(long position, ByteBuf dst, int dstStart, int dstLength) throws IOException {
-      final int bufferPosition = checkOffset(position, dstLength);
-      final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+   public int read(int position, ByteBuf dst, int dstStart, int dstLength) throws IOException {
+      final long srcAddress = this.address + position;
       if (dst.hasMemoryAddress()) {
          final long dstAddress = dst.memoryAddress() + dstStart;
          PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength);
@@ -122,9 +108,8 @@ final class MappedFile implements AutoCloseable {
     * <p>
     * <p> Bytes are read starting at this file's specified position.
     */
-   public int read(long position, ByteBuffer dst, int dstStart, int dstLength) throws IOException {
-      final int bufferPosition = checkOffset(position, dstLength);
-      final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+   public int read(int position, ByteBuffer dst, int dstStart, int dstLength) throws IOException {
+      final long srcAddress = this.address + position;
       if (dst.isDirect()) {
          final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart;
          PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength);
@@ -146,10 +131,9 @@ final class MappedFile implements AutoCloseable {
     * then the position is updated with the number of bytes actually read.
     */
    public int read(ByteBuf dst, int dstStart, int dstLength) throws IOException {
-      final int remaining = (int) Math.min(this.length - this.position, Integer.MAX_VALUE);
+      final int remaining = this.length - this.position;
       final int read = Math.min(remaining, dstLength);
-      final int bufferPosition = checkOffset(position, read);
-      final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+      final long srcAddress = this.address + position;
       if (dst.hasMemoryAddress()) {
          final long dstAddress = dst.memoryAddress() + dstStart;
          PlatformDependent.copyMemory(srcAddress, dstAddress, read);
@@ -170,10 +154,9 @@ final class MappedFile implements AutoCloseable {
     * then the position is updated with the number of bytes actually read.
     */
    public int read(ByteBuffer dst, int dstStart, int dstLength) throws IOException {
-      final int remaining = (int) Math.min(this.length - this.position, Integer.MAX_VALUE);
+      final int remaining = this.length - this.position;
       final int read = Math.min(remaining, dstLength);
-      final int bufferPosition = checkOffset(position, read);
-      final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+      final long srcAddress = this.address + position;
       if (dst.isDirect()) {
          final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart;
          PlatformDependent.copyMemory(srcAddress, dstAddress, read);
@@ -192,8 +175,7 @@ final class MappedFile implements AutoCloseable {
     */
    public void write(EncodingSupport encodingSupport) throws IOException {
       final int encodedSize = encodingSupport.getEncodeSize();
-      final int bufferPosition = checkOffset(position, encodedSize);
-      this.byteBufWrapper.wrap(this.lastMapped, bufferPosition, encodedSize);
+      this.byteBufWrapper.wrap(this.buffer, this.position, encodedSize);
       try {
          encodingSupport.encode(this.channelBufferWrapper);
       } finally {
@@ -211,8 +193,7 @@ final class MappedFile implements AutoCloseable {
     * <p> Bytes are written starting at this file's current position,
     */
    public void write(ByteBuf src, int srcStart, int srcLength) throws IOException {
-      final int bufferPosition = checkOffset(position, srcLength);
-      final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+      final long destAddress = this.address + position;
       if (src.hasMemoryAddress()) {
          final long srcAddress = src.memoryAddress() + srcStart;
          PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
@@ -234,8 +215,7 @@ final class MappedFile implements AutoCloseable {
     * <p> Bytes are written starting at this file's current position,
     */
    public void write(ByteBuffer src, int srcStart, int srcLength) throws IOException {
-      final int bufferPosition = checkOffset(position, srcLength);
-      final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+      final long destAddress = this.address + position;
       if (src.isDirect()) {
          final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart;
          PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
@@ -254,9 +234,8 @@ final class MappedFile implements AutoCloseable {
     * <p>
     * <p> Bytes are written starting at this file's specified position,
     */
-   public void write(long position, ByteBuf src, int srcStart, int srcLength) throws IOException {
-      final int bufferPosition = checkOffset(position, srcLength);
-      final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+   public void write(int position, ByteBuf src, int srcStart, int srcLength) throws IOException {
+      final long destAddress = this.address + position;
       if (src.hasMemoryAddress()) {
          final long srcAddress = src.memoryAddress() + srcStart;
          PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
@@ -277,9 +256,8 @@ final class MappedFile implements AutoCloseable {
     * <p>
     * <p> Bytes are written starting at this file's specified position,
     */
-   public void write(long position, ByteBuffer src, int srcStart, int srcLength) throws IOException {
-      final int bufferPosition = checkOffset(position, srcLength);
-      final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+   public void write(int position, ByteBuffer src, int srcStart, int srcLength) throws IOException {
+      final long destAddress = this.address + position;
       if (src.isDirect()) {
          final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart;
          PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
@@ -298,32 +276,47 @@ final class MappedFile implements AutoCloseable {
     * <p>
     * <p> Bytes are written starting at this file's current position,
     */
-   public void zeros(long offset, int count) throws IOException {
-      while (count > 0) {
-         //do not need to validate the bytes count
-         final int bufferPosition = checkOffset(offset, 0);
-         final int endZerosPosition = (int)Math.min((long)bufferPosition + count, lastMapped.capacity());
-         final int zeros = endZerosPosition - bufferPosition;
-         final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
-         PlatformDependent.setMemory(destAddress, zeros, (byte) 0);
-         offset += zeros;
-         count -= zeros;
-         //TODO need to call force on each write?
-         //this.force();
+   public void zeros(int position, final int count) throws IOException {
+      //zeroes memory in reverse direction in OS_PAGE_SIZE batches
+      //to gain sympathy by the page cache LRU policy
+      final long start = this.address + position;
+      final long end = start + count;
+      int toZeros = count;
+      final long lastGap = (int) (end & (OS_PAGE_SIZE - 1));
+      final long lastStartPage = end - lastGap;
+      long lastZeroed = end;
+      if (start <= lastStartPage) {
+         if (lastGap > 0) {
+            PlatformDependent.setMemory(lastStartPage, lastGap, (byte) 0);
+            lastZeroed = lastStartPage;
+            toZeros -= lastGap;
+         }
+      }
+      //any that will enter has lastZeroed OS page aligned
+      while (toZeros >= OS_PAGE_SIZE) {
+         assert BytesUtils.isAligned(lastZeroed, OS_PAGE_SIZE);/**/
+         final long startPage = lastZeroed - OS_PAGE_SIZE;
+         PlatformDependent.setMemory(startPage, OS_PAGE_SIZE, (byte) 0);
+         lastZeroed = startPage;
+         toZeros -= OS_PAGE_SIZE;
       }
-      if (offset > this.length) {
-         this.length = offset;
+      //there is anything left in the first OS page?
+      if (toZeros > 0) {
+         PlatformDependent.setMemory(start, toZeros, (byte) 0);
+      }
+
+      position += count;
+      if (position > this.length) {
+         this.length = position;
       }
    }
 
-   public long position() {
+   public int position() {
       return position;
    }
 
-   public long position(long newPosition) {
-      final long oldPosition = this.position;
-      this.position = newPosition;
-      return oldPosition;
+   public void position(int position) {
+      this.position = position;
    }
 
    public long length() {
@@ -332,10 +325,13 @@ final class MappedFile implements AutoCloseable {
 
    @Override
    public void close() {
-      cache.close();
-   }
-
-   public void closeAndResize(long length) {
-      cache.closeAndResize(length);
+      try {
+         channel.close();
+      } catch (IOException e) {
+         throw new IllegalStateException(e);
+      } finally {
+         //unmap in a deterministic way: do not rely on GC to do it
+         PlatformDependent.freeDirectBuffer(this.buffer);
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
index 12e359c..0091f30 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
@@ -38,32 +38,37 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 final class MappedSequentialFile implements SequentialFile {
 
    private final File directory;
-   private final long chunkBytes;
-   private final long overlapBytes;
    private final IOCriticalErrorListener criticalErrorListener;
    private final MappedSequentialFileFactory factory;
    private File file;
    private File absoluteFile;
    private String fileName;
    private MappedFile mappedFile;
+   private int capacity;
 
    MappedSequentialFile(MappedSequentialFileFactory factory,
                         final File directory,
                         final File file,
-                        final long chunkBytes,
-                        final long overlapBytes,
+                        final int capacity,
                         final IOCriticalErrorListener criticalErrorListener) {
       this.factory = factory;
       this.directory = directory;
       this.file = file;
       this.absoluteFile = null;
       this.fileName = null;
-      this.chunkBytes = chunkBytes;
-      this.overlapBytes = overlapBytes;
+      this.capacity = capacity;
       this.mappedFile = null;
       this.criticalErrorListener = criticalErrorListener;
    }
 
+   public MappedFile mappedFile() {
+      return mappedFile;
+   }
+
+   public int capacity() {
+      return this.capacity;
+   }
+
    private void checkIsOpen() {
       if (!isOpen()) {
          throw new IllegalStateException("File not opened!");
@@ -95,7 +100,7 @@ final class MappedSequentialFile implements SequentialFile {
    @Override
    public void open() throws IOException {
       if (this.mappedFile == null) {
-         this.mappedFile = MappedFile.of(file, chunkBytes, overlapBytes);
+         this.mappedFile = MappedFile.of(this.file, 0, this.capacity);
       }
    }
 
@@ -129,7 +134,11 @@ final class MappedSequentialFile implements SequentialFile {
    @Override
    public void fill(int size) throws IOException {
       checkIsOpen();
+      //the fill will give a big performance hit when done in parallel of other writings!
       this.mappedFile.zeros(this.mappedFile.position(), size);
+      if (factory.isDatasync()) {
+         this.mappedFile.force();
+      }
    }
 
    @Override
@@ -214,11 +223,11 @@ final class MappedSequentialFile implements SequentialFile {
 
    @Override
    public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
-      if (callback == null) {
-         throw new NullPointerException("callback parameter need to be set");
-      }
-      checkIsOpen(callback);
       try {
+         if (callback == null) {
+            throw new NullPointerException("callback parameter need to be set");
+         }
+         checkIsOpen(callback);
          final int position = bytes.position();
          final int limit = bytes.limit();
          final int remaining = limit - position;
@@ -237,22 +246,28 @@ final class MappedSequentialFile implements SequentialFile {
          }
          callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
          throw new RuntimeException(e);
+      } finally {
+         this.factory.releaseBuffer(bytes);
       }
    }
 
    @Override
    public void writeDirect(ByteBuffer bytes, boolean sync) throws IOException {
-      checkIsOpen();
-      final int position = bytes.position();
-      final int limit = bytes.limit();
-      final int remaining = limit - position;
-      if (remaining > 0) {
-         this.mappedFile.write(bytes, position, remaining);
-         final int newPosition = position + remaining;
-         bytes.position(newPosition);
-         if (factory.isDatasync() && sync) {
-            this.mappedFile.force();
+      try {
+         checkIsOpen();
+         final int position = bytes.position();
+         final int limit = bytes.limit();
+         final int remaining = limit - position;
+         if (remaining > 0) {
+            this.mappedFile.write(bytes, position, remaining);
+            final int newPosition = position + remaining;
+            bytes.position(newPosition);
+            if (factory.isDatasync() && sync) {
+               this.mappedFile.force();
+            }
          }
+      } finally {
+         this.factory.releaseBuffer(bytes);
       }
    }
 
@@ -304,8 +319,11 @@ final class MappedSequentialFile implements SequentialFile {
 
    @Override
    public void position(long pos) {
+      if (pos > Integer.MAX_VALUE) {
+         throw new IllegalArgumentException("pos must be < " + Integer.MAX_VALUE);
+      }
       checkIsOpen();
-      this.mappedFile.position(pos);
+      this.mappedFile.position((int) pos);
    }
 
    @Override
@@ -317,7 +335,7 @@ final class MappedSequentialFile implements SequentialFile {
    @Override
    public void close() {
       if (this.mappedFile != null) {
-         this.mappedFile.closeAndResize(this.mappedFile.length());
+         this.mappedFile.close();
          this.mappedFile = null;
       }
    }
@@ -325,7 +343,9 @@ final class MappedSequentialFile implements SequentialFile {
    @Override
    public void sync() throws IOException {
       checkIsOpen();
-      this.mappedFile.force();
+      if (factory.isDatasync()) {
+         this.mappedFile.force();
+      }
    }
 
    @Override
@@ -363,9 +383,9 @@ final class MappedSequentialFile implements SequentialFile {
    }
 
    @Override
-   public SequentialFile cloneFile() {
+   public MappedSequentialFile cloneFile() {
       checkIsNotOpen();
-      return new MappedSequentialFile(factory, this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener);
+      return new MappedSequentialFile(this.factory, this.directory, this.file, this.capacity, this.criticalErrorListener);
    }
 
    @Override
@@ -404,4 +424,4 @@ final class MappedSequentialFile implements SequentialFile {
       }
       return this.absoluteFile;
    }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
index c4b7d30..a05d322 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
@@ -29,62 +29,64 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
+import org.apache.activemq.artemis.utils.Env;
 
 public final class MappedSequentialFileFactory implements SequentialFileFactory {
 
-   private static long DEFAULT_BLOCK_SIZE = 64L << 20;
    private final File directory;
+   private int capacity;
    private final IOCriticalErrorListener criticalErrorListener;
    private final TimedBuffer timedBuffer;
-   private long chunkBytes;
-   private long overlapBytes;
    private boolean useDataSync;
-   private boolean supportCallbacks;
-
-   protected volatile int alignment = -1;
-
-   public MappedSequentialFileFactory(File directory,
-                                      IOCriticalErrorListener criticalErrorListener,
-                                      boolean supportCallbacks) {
+   private boolean bufferPooling;
+   //pools only the biggest one -> optimized for the common case
+   private final ThreadLocal<ByteBuffer> bytesPool;
+
+   private MappedSequentialFileFactory(File directory,
+                                       int capacity,
+                                       final boolean buffered,
+                                       final int bufferSize,
+                                       final int bufferTimeout,
+                                       IOCriticalErrorListener criticalErrorListener) {
       this.directory = directory;
+      this.capacity = capacity;
       this.criticalErrorListener = criticalErrorListener;
-      this.chunkBytes = DEFAULT_BLOCK_SIZE;
-      this.overlapBytes = DEFAULT_BLOCK_SIZE / 4;
       this.useDataSync = true;
-      this.timedBuffer = null;
-      this.supportCallbacks = supportCallbacks;
-   }
-
-   public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
-      this(directory, criticalErrorListener, false);
-   }
-
-   public MappedSequentialFileFactory(File directory) {
-      this(directory, null);
+      if (buffered && bufferTimeout > 0 && bufferSize > 0) {
+         timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, false);
+      } else {
+         timedBuffer = null;
+      }
+      this.bufferPooling = true;
+      this.bytesPool = new ThreadLocal<>();
    }
 
-
-   public long chunkBytes() {
-      return chunkBytes;
+   public MappedSequentialFileFactory capacity(int capacity) {
+      this.capacity = capacity;
+      return this;
    }
 
-   public MappedSequentialFileFactory chunkBytes(long chunkBytes) {
-      this.chunkBytes = chunkBytes;
-      return this;
+   public int capacity() {
+      return capacity;
    }
 
-   public long overlapBytes() {
-      return overlapBytes;
+   public static MappedSequentialFileFactory buffered(File directory,
+                                                      int capacity,
+                                                      final int bufferSize,
+                                                      final int bufferTimeout,
+                                                      IOCriticalErrorListener criticalErrorListener) {
+      return new MappedSequentialFileFactory(directory, capacity, true, bufferSize, bufferTimeout, criticalErrorListener);
    }
 
-   public MappedSequentialFileFactory overlapBytes(long overlapBytes) {
-      this.overlapBytes = overlapBytes;
-      return this;
+   public static MappedSequentialFileFactory unbuffered(File directory,
+                                                        int capacity,
+                                                        IOCriticalErrorListener criticalErrorListener) {
+      return new MappedSequentialFileFactory(directory, capacity, false, 0, 0, criticalErrorListener);
    }
 
    @Override
    public SequentialFile createSequentialFile(String fileName) {
-      final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
+      final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, directory, new File(directory, fileName), capacity, criticalErrorListener);
       if (this.timedBuffer == null) {
          return mappedSequentialFile;
       } else {
@@ -93,7 +95,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
    }
 
    @Override
-   public SequentialFileFactory setDatasync(boolean enabled) {
+   public MappedSequentialFileFactory setDatasync(boolean enabled) {
       this.useDataSync = enabled;
       return this;
    }
@@ -120,7 +122,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
 
    @Override
    public boolean isSupportsCallbacks() {
-      return this.supportCallbacks;
+      return timedBuffer != null;
    }
 
    @Override
@@ -132,23 +134,65 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
 
    @Override
    public ByteBuffer allocateDirectBuffer(final int size) {
-      return ByteBuffer.allocateDirect(size);
+      final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize());
+      final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
+      byteBuffer.limit(size);
+      return byteBuffer;
    }
 
    @Override
-   public void releaseDirectBuffer(final ByteBuffer buffer) {
+   public void releaseDirectBuffer(ByteBuffer buffer) {
       PlatformDependent.freeDirectBuffer(buffer);
    }
 
+   public MappedSequentialFileFactory enableBufferReuse() {
+      this.bufferPooling = true;
+      return this;
+   }
+
+   public MappedSequentialFileFactory disableBufferReuse() {
+      this.bufferPooling = false;
+      return this;
+   }
+
    @Override
    public ByteBuffer newBuffer(final int size) {
-      return ByteBuffer.allocate(size);
+      if (!this.bufferPooling) {
+         return allocateDirectBuffer(size);
+      } else {
+         final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize());
+         ByteBuffer byteBuffer = bytesPool.get();
+         if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
+            //do not free the old one (if any) until the new one will be released into the pool!
+            byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
+         } else {
+            bytesPool.set(null);
+            PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0);
+            byteBuffer.clear();
+         }
+         byteBuffer.limit(size);
+         return byteBuffer;
+      }
    }
 
    @Override
    public void releaseBuffer(ByteBuffer buffer) {
-      if (buffer.isDirect()) {
-         PlatformDependent.freeDirectBuffer(buffer);
+      if (this.bufferPooling) {
+         if (buffer.isDirect()) {
+            final ByteBuffer byteBuffer = bytesPool.get();
+            if (byteBuffer != buffer) {
+               //replace with the current pooled only if greater or null
+               if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) {
+                  if (byteBuffer != null) {
+                     //free the smaller one
+                     PlatformDependent.freeDirectBuffer(byteBuffer);
+                  }
+                  bytesPool.set(buffer);
+               } else {
+                  PlatformDependent.freeDirectBuffer(buffer);
+               }
+            }
+         }
       }
    }
 
@@ -179,9 +223,9 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
    }
 
    @Override
+   @Deprecated
    public MappedSequentialFileFactory setAlignment(int alignment) {
-      this.alignment = alignment;
-      return this;
+      throw new UnsupportedOperationException("alignment can't be changed!");
    }
 
    @Override
@@ -203,7 +247,6 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
          //SIMD OPTIMIZATION
          Arrays.fill(array, (byte) 0);
       } else {
-         //TODO VERIFY IF IT COULD HAPPENS
          final int capacity = buffer.capacity();
          for (int i = 0; i < capacity; i++) {
             buffer.put(i, (byte) 0);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
index d376d7d..8436ed5 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
@@ -20,14 +20,10 @@ package org.apache.activemq.artemis.core.io.mapped;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CyclicBarrier;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.core.io.DummyCallback;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
@@ -35,6 +31,7 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
 import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 
 final class TimedSequentialFile implements SequentialFile {
@@ -42,14 +39,12 @@ final class TimedSequentialFile implements SequentialFile {
    private final SequentialFileFactory factory;
    private final SequentialFile sequentialFile;
    private final LocalBufferObserver observer;
-   private final ThreadLocal<ResettableIOCallback> callbackPool;
    private TimedBuffer timedBuffer;
 
    TimedSequentialFile(SequentialFileFactory factory, SequentialFile sequentialFile) {
       this.sequentialFile = sequentialFile;
       this.factory = factory;
       this.observer = new LocalBufferObserver();
-      this.callbackPool = ThreadLocal.withInitial(ResettableIOCallback::new);
    }
 
    @Override
@@ -114,13 +109,10 @@ final class TimedSequentialFile implements SequentialFile {
    public void write(ActiveMQBuffer bytes, boolean sync) throws Exception {
       if (sync) {
          if (this.timedBuffer != null) {
-            final ResettableIOCallback callback = callbackPool.get();
-            try {
-               this.timedBuffer.addBytes(bytes, true, callback);
-               callback.waitCompletion();
-            } finally {
-               callback.reset();
-            }
+            //the only way to avoid allocations is by using a lock-free pooled callback -> CyclicBarrier allocates on each new Generation!!!
+            final SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
+            this.timedBuffer.addBytes(bytes, true, callback);
+            callback.waitCompletion();
          } else {
             this.sequentialFile.write(bytes, true);
          }
@@ -146,13 +138,10 @@ final class TimedSequentialFile implements SequentialFile {
    public void write(EncodingSupport bytes, boolean sync) throws Exception {
       if (sync) {
          if (this.timedBuffer != null) {
-            final ResettableIOCallback callback = callbackPool.get();
-            try {
-               this.timedBuffer.addBytes(bytes, true, callback);
-               callback.waitCompletion();
-            } finally {
-               callback.reset();
-            }
+            //the only way to avoid allocations is by using a lock-free pooled callback -> CyclicBarrier allocates on each new Generation!!!
+            final SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
+            this.timedBuffer.addBytes(bytes, true, callback);
+            callback.waitCompletion();
          } else {
             this.sequentialFile.write(bytes, true);
          }
@@ -197,7 +186,11 @@ final class TimedSequentialFile implements SequentialFile {
 
    @Override
    public void close() throws Exception {
-      this.sequentialFile.close();
+      try {
+         this.sequentialFile.close();
+      } finally {
+         this.timedBuffer = null;
+      }
    }
 
    @Override
@@ -241,128 +234,99 @@ final class TimedSequentialFile implements SequentialFile {
       return this.sequentialFile.getJavaFile();
    }
 
-   private static final class ResettableIOCallback implements IOCallback {
-
-      private final CyclicBarrier cyclicBarrier;
-      private int errorCode;
-      private String errorMessage;
-
-      ResettableIOCallback() {
-         this.cyclicBarrier = new CyclicBarrier(2);
-      }
-
-      public void waitCompletion() throws InterruptedException, ActiveMQException, BrokenBarrierException {
-         this.cyclicBarrier.await();
-         if (this.errorMessage != null) {
-            throw ActiveMQExceptionType.createException(this.errorCode, this.errorMessage);
-         }
-      }
-
-      public void reset() {
-         this.errorCode = 0;
-         this.errorMessage = null;
-      }
-
-      @Override
-      public void done() {
+   private static void invokeDoneOn(List<? extends IOCallback> callbacks) {
+      final int size = callbacks.size();
+      for (int i = 0; i < size; i++) {
          try {
-            this.cyclicBarrier.await();
-         } catch (BrokenBarrierException | InterruptedException e) {
-            throw new IllegalStateException(e);
+            final IOCallback callback = callbacks.get(i);
+            callback.done();
+         } catch (Throwable e) {
+            ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
          }
       }
+   }
 
-      @Override
-      public void onError(int errorCode, String errorMessage) {
+   private static void invokeOnErrorOn(final int errorCode,
+                                       final String errorMessage,
+                                       List<? extends IOCallback> callbacks) {
+      final int size = callbacks.size();
+      for (int i = 0; i < size; i++) {
          try {
-            this.errorCode = errorCode;
-            this.errorMessage = errorMessage;
-            this.cyclicBarrier.await();
-         } catch (BrokenBarrierException | InterruptedException e) {
-            throw new IllegalStateException(e);
+            final IOCallback callback = callbacks.get(i);
+            callback.onError(errorCode, errorMessage);
+         } catch (Throwable e) {
+            ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
          }
       }
    }
 
    private static final class DelegateCallback implements IOCallback {
 
-      final List<IOCallback> delegates;
+      List<IOCallback> delegates;
 
       private DelegateCallback() {
-         this.delegates = new ArrayList<>();
-      }
-
-      public List<IOCallback> delegates() {
-         return this.delegates;
+         this.delegates = null;
       }
 
       @Override
       public void done() {
-         final int size = delegates.size();
-         for (int i = 0; i < size; i++) {
-            try {
-               final IOCallback callback = delegates.get(i);
-               callback.done();
-            } catch (Throwable e) {
-               ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
-            }
-         }
+         invokeDoneOn(delegates);
       }
 
       @Override
       public void onError(final int errorCode, final String errorMessage) {
-         for (IOCallback callback : delegates) {
-            try {
-               callback.onError(errorCode, errorMessage);
-            } catch (Throwable e) {
-               ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
-            }
-         }
+         invokeOnErrorOn(errorCode, errorMessage, delegates);
       }
    }
 
    private final class LocalBufferObserver implements TimedBufferObserver {
 
-      private final ThreadLocal<DelegateCallback> callbacksPool = ThreadLocal.withInitial(DelegateCallback::new);
+      private final DelegateCallback delegateCallback = new DelegateCallback();
 
       @Override
       public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCallback> callbacks) {
          buffer.flip();
+
          if (buffer.limit() == 0) {
-            //if there are no bytes to flush, can release the callbacks
-            final int size = callbacks.size();
-            for (int i = 0; i < size; i++) {
-               callbacks.get(i).done();
-            }
-         } else {
-            final DelegateCallback delegateCallback = callbacksPool.get();
-            final int size = callbacks.size();
-            final List<IOCallback> delegates = delegateCallback.delegates();
-            for (int i = 0; i < size; i++) {
-               delegates.add(callbacks.get(i));
-            }
             try {
-               sequentialFile.writeDirect(buffer, requestedSync, delegateCallback);
+               invokeDoneOn(callbacks);
             } finally {
-               delegates.clear();
+               factory.releaseBuffer(buffer);
+            }
+         } else {
+            if (callbacks.isEmpty()) {
+               try {
+                  sequentialFile.writeDirect(buffer, requestedSync);
+               } catch (Exception e) {
+                  throw new IllegalStateException(e);
+               }
+            } else {
+               delegateCallback.delegates = callbacks;
+               try {
+                  sequentialFile.writeDirect(buffer, requestedSync, delegateCallback);
+               } finally {
+                  delegateCallback.delegates = null;
+               }
             }
          }
       }
 
       @Override
       public ByteBuffer newBuffer(final int size, final int limit) {
-         final int alignedSize = factory.calculateBlockSize(size);
-         final int alignedLimit = factory.calculateBlockSize(limit);
-         final ByteBuffer buffer = factory.newBuffer(alignedSize);
-         buffer.limit(alignedLimit);
-         return buffer;
+         return factory.newBuffer(limit);
       }
 
       @Override
       public int getRemainingBytes() {
          try {
-            final int remaining = (int) Math.min(sequentialFile.size() - sequentialFile.position(), Integer.MAX_VALUE);
-            return remaining;
+            final long position = sequentialFile.position();
+            final long size = sequentialFile.size();
+            final long remaining = size - position;
+            if (remaining > Integer.MAX_VALUE) {
+               return Integer.MAX_VALUE;
+            } else {
+               return (int) remaining;
+            }
          } catch (Exception e) {
             throw new IllegalStateException(e);
          }
@@ -370,7 +334,7 @@ final class TimedSequentialFile implements SequentialFile {
 
       @Override
       public String toString() {
-         return "TimedBufferObserver on file (" + getFileName() + ")";
+         return "TimedBufferObserver on file (" + sequentialFile.getFileName() + ")";
       }
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
index b426219..d0bada8 100644
--- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
+++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
@@ -37,7 +37,6 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
 import org.apache.activemq.artemis.jlibaio.LibaioContext;
 
 /**
@@ -47,12 +46,12 @@ public class JournalTptBenchmark {
 
    public static void main(String[] args) throws Exception {
       final boolean useDefaultIoExecutor = true;
-      final int fileSize = 1024 * 1024;
-      final boolean dataSync = true;
+      final int fileSize = 10 * 1024 * 1024;
+      final boolean dataSync = false;
       final Type type = Type.Mapped;
-      final int tests = 5;
+      final int tests = 10;
       final int warmup = 20_000;
-      final int measurements = 20_000;
+      final int measurements = 100_000;
       final int msgSize = 100;
       final byte[] msgContent = new byte[msgSize];
       Arrays.fill(msgContent, (byte) 1);
@@ -63,8 +62,8 @@ public class JournalTptBenchmark {
       switch (type) {
 
          case Mapped:
-            final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true);
-            factory = mappedFactory.chunkBytes(fileSize).overlapBytes(0).setDatasync(dataSync);
+            factory = MappedSequentialFileFactory.buffered(tmpDirectory, fileSize, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null)
+               .setDatasync(dataSync);
             break;
          case Nio:
             factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
@@ -195,9 +194,7 @@ public class JournalTptBenchmark {
 
    private static void write(long id, Journal journal, EncodingSupport encodingSupport) throws Exception {
       journal.appendAddRecord(id, (byte) 1, encodingSupport, false);
-      final SimpleWaitIOCallback ioCallback = new SimpleWaitIOCallback();
-      journal.appendUpdateRecord(id, (byte) 1, encodingSupport, true, ioCallback);
-      ioCallback.waitCompletion();
+      journal.appendUpdateRecord(id, (byte) 1, encodingSupport, true);
    }
 
    private enum Type {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
index 7756a06..7f2641a 100644
--- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
+++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.LockSupport;
 
 import org.apache.activemq.artemis.ArtemisConstants;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -41,12 +40,12 @@ public class SequentialFileTptBenchmark {
    private static final FastWaitIOCallback CALLBACK = new FastWaitIOCallback();
 
    public static void main(String[] args) throws Exception {
-      final boolean dataSync = true;
+      final boolean dataSync = false;
       final boolean writeSync = true;
       final Type type = Type.Mapped;
       final int tests = 10;
       final int warmup = 20_000;
-      final int measurements = 20_000;
+      final int measurements = 100_000;
       final int msgSize = 100;
       final byte[] msgContent = new byte[msgSize];
       Arrays.fill(msgContent, (byte) 1);
@@ -56,10 +55,8 @@ public class SequentialFileTptBenchmark {
       switch (type) {
 
          case Mapped:
-            final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true);
-            final int alignedMessageSize = mappedFactory.calculateBlockSize(msgSize);
-            final int totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup);
-            factory = mappedFactory.chunkBytes(totalFileSize).overlapBytes(0).setDatasync(dataSync);
+            final int fileSize = Math.max(msgSize * measurements, msgSize * warmup);
+            factory = MappedSequentialFileFactory.buffered(tmpDirectory, fileSize, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync);
             break;
          case Nio:
             factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
@@ -147,9 +144,9 @@ public class SequentialFileTptBenchmark {
                              boolean sync) throws Exception {
       //this pattern is necessary to ensure that NIO's TimedBuffer fill flush the buffer and know the real size of it
       if (sequentialFile.fits(encodingSupport.getEncodeSize())) {
-         final FastWaitIOCallback ioCallback = CALLBACK.reset();
-         sequentialFile.write(encodingSupport, sync, ioCallback);
-         ioCallback.waitCompletion();
+         CALLBACK.reset();
+         sequentialFile.write(encodingSupport, sync, CALLBACK);
+         CALLBACK.waitCompletion();
       } else {
          throw new IllegalStateException("can't happen!");
       }
@@ -189,11 +186,7 @@ public class SequentialFileTptBenchmark {
       }
 
       public void waitCompletion() throws InterruptedException, ActiveMQException {
-         final Thread currentThread = Thread.currentThread();
          while (!done.get()) {
-            LockSupport.parkNanos(1L);
-            if (currentThread.isInterrupted())
-               throw new InterruptedException();
          }
          if (errorMessage != null) {
             throw ActiveMQExceptionType.createException(errorCode, errorMessage);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index ba7bb86..148c1f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -137,8 +137,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
             break;
          case MAPPED:
             ActiveMQServerLogger.LOGGER.journalUseMAPPED();
-            //the mapped version do not need buffering by default
-            journalFF = new MappedSequentialFileFactory(config.getJournalLocation(), criticalErrorListener, true).chunkBytes(config.getJournalFileSize()).overlapBytes(0);
+            journalFF = MappedSequentialFileFactory.buffered(config.getJournalLocation(), config.getJournalFileSize(), config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), criticalErrorListener);
             break;
          default:
             throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/tests/extra-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/extra-tests/pom.xml b/tests/extra-tests/pom.xml
index 6ba2c55..4a565cf 100644
--- a/tests/extra-tests/pom.xml
+++ b/tests/extra-tests/pom.xml
@@ -223,34 +223,6 @@
          <artifactId>jbossjts-jacorb</artifactId>
          <version>4.17.13.Final</version>
       </dependency>
-
-      <!-- ### Benchmark Tools -->
-      <!-- ### Java Latency Benchmarking Harness -->
-      <dependency>
-         <groupId>net.openhft</groupId>
-         <artifactId>chronicle-core</artifactId>
-         <version>${openhft.core.version}</version>
-         <!-- License: Apache 2.0 -->
-      </dependency>
-      <dependency>
-         <groupId>net.openhft</groupId>
-         <artifactId>affinity</artifactId>
-         <version>${openhft.affinity.version}</version>
-         <!-- License: LGPLv3-->
-      </dependency>
-      <!-- ### Java Microbenchmark Harness -->
-      <dependency>
-         <groupId>org.openjdk.jmh</groupId>
-         <artifactId>jmh-core</artifactId>
-         <version>${openjdk.jmh.version}</version>
-         <!-- License: GPLv2-->
-      </dependency>
-      <dependency>
-         <groupId>org.openjdk.jmh</groupId>
-         <artifactId>jmh-generator-annprocess</artifactId>
-         <version>${openjdk.jmh.version}</version>
-         <!-- License: GPLv2-->
-      </dependency>
    </dependencies>
 
    <build>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplLatencyBench.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplLatencyBench.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplLatencyBench.java
deleted file mode 100644
index 3304f15..0000000
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplLatencyBench.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.extras.benchmarks.journal;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-
-import net.openhft.chronicle.core.jlbh.JLBH;
-import net.openhft.chronicle.core.jlbh.JLBHOptions;
-import net.openhft.chronicle.core.jlbh.JLBHTask;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
-import org.apache.activemq.artemis.core.io.SequentialFileFactory;
-import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
-import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.journal.Journal;
-import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-
-public class JournalImplLatencyBench implements JLBHTask {
-
-   private static final int FILE_SIZE = 1024 * 1024 * 1024;
-   private static final JournalType JOURNAL_TYPE = JournalType.MAPPED;
-   private static final int ITERATIONS = 100_000;
-   private static final int WARMUP_ITERATIONS = 20_000;
-   private static final int TARGET_THROUGHPUT = 50_000;
-   private static final int TESTS = 5;
-   private static int TOTAL_MESSAGES = (ITERATIONS * TESTS + WARMUP_ITERATIONS);
-   private static int ENCODED_SIZE = 8;
-   private static int CHUNK_BYTES = FILE_SIZE;
-   private static int OVERLAP_BYTES = CHUNK_BYTES / 4;
-   private final SequentialFileFactory sequentialFileFactory;
-   private Journal journal;
-   private EncodingSupport encodingSupport;
-   private JLBH jlbh;
-   private long id;
-
-   public JournalImplLatencyBench(SequentialFileFactory sequentialFileFactory) {
-      this.sequentialFileFactory = sequentialFileFactory;
-   }
-
-   public static void main(String[] args) throws IOException {
-      final File journalDir = Files.createTempDirectory("seq_files").toFile();
-      journalDir.deleteOnExit();
-      final boolean buffered = false;
-      final int bufferSize = 4096;
-      final int bufferTimeout = 0;
-      final int maxIO = -1;
-      final boolean logRates = false;
-      final IOCriticalErrorListener criticalErrorListener = null;
-      final SequentialFileFactory sequentialFileFactory;
-      switch (JOURNAL_TYPE) {
-         case MAPPED:
-            sequentialFileFactory = new MappedSequentialFileFactory(journalDir, criticalErrorListener).chunkBytes(CHUNK_BYTES).overlapBytes(OVERLAP_BYTES);
-            break;
-         case NIO:
-            sequentialFileFactory = new NIOSequentialFileFactory(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener);
-            break;
-
-         default:
-            throw new AssertionError("!?");
-      }
-      final JLBHOptions lth = new JLBHOptions().warmUpIterations(WARMUP_ITERATIONS).iterations(ITERATIONS).throughput(TARGET_THROUGHPUT).runs(TESTS).recordOSJitter(true).accountForCoordinatedOmmission(true).jlbhTask(new JournalImplLatencyBench(sequentialFileFactory));
-      new JLBH(lth).start();
-   }
-
-   @Override
-   public void init(JLBH jlbh) {
-      id = 0;
-      this.jlbh = jlbh;
-      int numFiles = (int) ((TOTAL_MESSAGES * 1024 + 512) / FILE_SIZE * 1.3);
-      if (numFiles < 2) {
-         numFiles = 2;
-      }
-      this.journal = new JournalImpl(FILE_SIZE, numFiles, numFiles, 0, 0, sequentialFileFactory, "activemq-data", "amq", Integer.MAX_VALUE);
-      this.encodingSupport = NilEncodingSupport.Instance;
-      try {
-         journal.start();
-         journal.load(new ArrayList<RecordInfo>(), null, null);
-      } catch (Exception e) {
-         throw new RuntimeException(e);
-      }
-
-   }
-
-   @Override
-   public void run(long startTimeNS) {
-      id++;
-      try {
-         journal.appendAddRecord(id, (byte) 0, encodingSupport, false);
-      } catch (Exception e) {
-         throw new RuntimeException(e);
-      }
-      jlbh.sample(System.nanoTime() - startTimeNS);
-   }
-
-   @Override
-   public void complete() {
-      try {
-         journal.stop();
-         for (File journalFile : sequentialFileFactory.getDirectory().listFiles()) {
-            journalFile.deleteOnExit();
-         }
-      } catch (Exception e) {
-         throw new RuntimeException(e);
-      }
-   }
-
-   private enum JournalType {
-      MAPPED,
-      NIO
-   }
-
-   private enum NilEncodingSupport implements EncodingSupport {
-      Instance;
-
-      @Override
-      public int getEncodeSize() {
-         return ENCODED_SIZE;
-      }
-
-      @Override
-      public void encode(ActiveMQBuffer buffer) {
-         final int writerIndex = buffer.writerIndex();
-         for (int i = 0; i < ENCODED_SIZE; i++) {
-            buffer.writeByte((byte) 0);
-         }
-         buffer.writerIndex(writerIndex + ENCODED_SIZE);
-      }
-
-      @Override
-      public void decode(ActiveMQBuffer buffer) {
-         throw new UnsupportedOperationException();
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/AddJournalRecordEncoder.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/AddJournalRecordEncoder.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/AddJournalRecordEncoder.java
deleted file mode 100644
index 1096c63..0000000
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/AddJournalRecordEncoder.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
-
-import java.nio.ByteBuffer;
-
-import io.netty.util.internal.PlatformDependent;
-
-/**
- * IT IS NOT A FLYWEIGHT BUT AN ENCODER: NEED TO RESPECT THE SEQUENCE OF WRITE:
- * FileId<CompactCount<Id<RecordType<RecordBytes
- */
-final class AddJournalRecordEncoder {
-
-   private static final int FILE_ID_OFFSET = 0;
-   private static final int COMPACT_COUNT_OFFSET = FILE_ID_OFFSET + 4;
-   private static final int ID_OFFSET = COMPACT_COUNT_OFFSET + 4;
-   private static final int RECORD_TYPE_OFFSET = ID_OFFSET + 8;
-   public static final int BLOCK_SIZE = RECORD_TYPE_OFFSET + 4;
-
-   private ByteBuffer bytes;
-   private int offset;
-   private int limit;
-
-   public static int expectedSize(int recordBytes) {
-      return BLOCK_SIZE + 4 + recordBytes;
-   }
-
-   public ByteBuffer bytes() {
-      return bytes;
-   }
-
-   public int offset() {
-      return this.offset;
-   }
-
-   public int limit() {
-      return this.limit;
-   }
-
-   public void limit(int limit) {
-      this.limit = limit;
-   }
-
-   public AddJournalRecordEncoder on(ByteBuffer bytes, int offset) {
-      this.bytes = bytes;
-      this.offset = offset;
-      this.limit = offset + BLOCK_SIZE;
-      return this;
-   }
-
-   public AddJournalRecordEncoder fileId(int value) {
-      this.bytes.putInt(offset + FILE_ID_OFFSET, value);
-      return this;
-   }
-
-   public AddJournalRecordEncoder compactCount(int value) {
-      this.bytes.putInt(offset + COMPACT_COUNT_OFFSET, value);
-      return this;
-   }
-
-   public AddJournalRecordEncoder id(long value) {
-      this.bytes.putLong(offset + ID_OFFSET, value);
-      return this;
-   }
-
-   public AddJournalRecordEncoder recordType(int value) {
-      this.bytes.putLong(offset + RECORD_TYPE_OFFSET, value);
-      return this;
-   }
-
-   public AddJournalRecordEncoder noRecord() {
-      this.bytes.putInt(this.limit, 0);
-      this.limit += 4;
-      return this;
-   }
-
-   public AddJournalRecordEncoder record(final ByteBuffer recordBytes, final int recordOffset, final int recordLength) {
-      this.bytes.putInt(this.limit, recordLength);
-      final long dstAddr = PlatformDependent.directBufferAddress(bytes) + this.limit + 4;
-      final long srcAddr = PlatformDependent.directBufferAddress(recordBytes) + recordOffset;
-      PlatformDependent.copyMemory(srcAddr, dstAddr, recordLength);
-      this.limit += (4 + recordLength);
-      return this;
-   }
-
-   public int encodedLength() {
-      return this.limit - this.offset;
-   }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java
deleted file mode 100644
index 162a512..0000000
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.buffer.Unpooled;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
-import org.apache.activemq.artemis.core.journal.EncoderPersister;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
-import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OutputTimeUnit;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.profile.GCProfiler;
-import org.openjdk.jmh.runner.Runner;
-import org.openjdk.jmh.runner.RunnerException;
-import org.openjdk.jmh.runner.options.Options;
-import org.openjdk.jmh.runner.options.OptionsBuilder;
-
-@State(Scope.Thread)
-@BenchmarkMode(value = {Mode.Throughput, Mode.SampleTime})
-@OutputTimeUnit(TimeUnit.MICROSECONDS)
-public class EncodersBench {
-
-   private static final int expectedEncoderSize = JournalRecordHeader.BYTES + AddJournalRecordEncoder.expectedSize(0);
-   private JournalInternalRecord record;
-   private ByteBuffer byteBuffer;
-   private AddJournalRecordEncoder addJournalRecordEncoder;
-   private ActiveMQBuffer outBuffer;
-
-   public static void main(String[] args) throws RunnerException {
-      final Options opt = new OptionsBuilder().include(EncodersBench.class.getSimpleName()).addProfiler(GCProfiler.class).warmupIterations(5).measurementIterations(5).forks(1).build();
-      new Runner(opt).run();
-   }
-
-   @Setup
-   public void init() {
-      this.byteBuffer = ByteBuffer.allocateDirect(expectedEncoderSize);
-      this.byteBuffer.order(ByteOrder.nativeOrder());
-      this.addJournalRecordEncoder = new AddJournalRecordEncoder();
-
-      this.record = new JournalAddRecord(true, 1, (byte) 1, EncoderPersister.getInstance(), ZeroEncodingSupport.Instance);
-      this.record.setFileID(1);
-      this.record.setCompactCount((short) 1);
-      this.outBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(this.record.getEncodeSize(), this.record.getEncodeSize()).order(ByteOrder.nativeOrder()));
-   }
-
-   @Benchmark
-   public int encodeAligned() {
-      //Header
-      final long header = JournalRecordHeader.makeHeader(JournalRecordTypes.ADD_JOURNAL, expectedEncoderSize);
-      this.byteBuffer.putLong(0, header);
-      //FileId<CompactCount<Id<RecordType<RecordBytes
-      return addJournalRecordEncoder.on(byteBuffer, JournalRecordHeader.BYTES).fileId(1).compactCount(1).id(1L).recordType(1).noRecord().encodedLength();
-   }
-
-   @Benchmark
-   public int encodeUnaligned() {
-      outBuffer.clear();
-      record.encode(outBuffer);
-      return record.getEncodeSize();
-   }
-
-   @Benchmark
-   public int encodeUnalignedWithGarbage() {
-      outBuffer.clear();
-      final JournalAddRecord addRecord = new JournalAddRecord(true, 1, (byte) 1, EncoderPersister.getInstance(), ZeroEncodingSupport.Instance);
-      addRecord.setFileID(1);
-      addRecord.setCompactCount((short) 1);
-      addRecord.encode(outBuffer);
-      return addRecord.getEncodeSize();
-   }
-
-   public enum ZeroEncodingSupport implements EncodingSupport {
-      Instance;
-
-      @Override
-      public int getEncodeSize() {
-         return 0;
-      }
-
-      @Override
-      public void encode(ActiveMQBuffer buffer) {
-      }
-
-      @Override
-      public void decode(ActiveMQBuffer buffer) {
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7075e2e4/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/GcFreeJournal.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/GcFreeJournal.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/GcFreeJournal.java
deleted file mode 100644
index d2af867..0000000
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/GcFreeJournal.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import org.apache.activemq.artemis.core.io.SequentialFile;
-import org.apache.activemq.artemis.core.io.SequentialFileFactory;
-import org.apache.activemq.artemis.core.journal.impl.JournalFile;
-import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-
-final class GcFreeJournal extends JournalImpl {
-
-   private final AddJournalRecordEncoder addJournalRecordEncoder = new AddJournalRecordEncoder();
-   //TODO replace with thread local pools if not single threaded!
-   private ByteBuffer journalRecordBytes = null;
-
-   GcFreeJournal(final int fileSize,
-                 final int minFiles,
-                 final int poolSize,
-                 final int compactMinFiles,
-                 final int compactPercentage,
-                 final SequentialFileFactory fileFactory,
-                 final String filePrefix,
-                 final String fileExtension,
-                 final int maxAIO) {
-      super(fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0);
-   }
-
-   public static int align(final int value, final int alignment) {
-      return (value + (alignment - 1)) & ~(alignment - 1);
-   }
-
-   public void appendAddRecord(final long id,
-                               final int recordType,
-                               final ByteBuffer encodedRecord,
-                               final int offset,
-                               final int length,
-                               final boolean sync) throws Exception {
-      final int expectedLength = JournalRecordHeader.BYTES + AddJournalRecordEncoder.expectedSize(length);
-      final int alignedLength = align(expectedLength, 8);
-      switchFileIfNecessary(alignedLength);
-      final JournalFile currentFile = getCurrentFile();
-      final int fileId = currentFile.getRecordID();
-      if (this.journalRecordBytes == null || this.journalRecordBytes.capacity() < alignedLength) {
-         final int newPooledLength = align(alignedLength, 4096);
-         //TODO ADD LIMITS OR WARNS IN CASE OF TOO MUCH BIGGER SIZE
-         this.journalRecordBytes = ByteBuffer.allocateDirect(newPooledLength);
-         this.journalRecordBytes.order(ByteOrder.nativeOrder());
-      }
-      final long journalRecordHeader = JournalRecordHeader.makeHeader(JournalRecordTypes.ADD_JOURNAL, expectedLength);
-      this.journalRecordBytes.putLong(0, journalRecordHeader);
-      //use natural stride while encoding: FileId<CompactCount<Id<RecordType<RecordBytes
-      this.addJournalRecordEncoder.on(this.journalRecordBytes, JournalRecordHeader.BYTES).fileId(fileId).compactCount(0).id(id).recordType(recordType).record(encodedRecord, offset, length);
-      final SequentialFile sequentialFile = currentFile.getFile();
-      try {
-         this.journalRecordBytes.limit(alignedLength);
-         sequentialFile.writeDirect(this.journalRecordBytes, sync);
-      } finally {
-         this.journalRecordBytes.clear();
-      }
-      //TODO AVOID INDEXING WITH CONCURRENT MAP!
-   }
-
-}


Mime
View raw message