activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [3/6] activemq-artemis git commit: ARTEMIS-906 Memory Mapped JournalType
Date Fri, 03 Feb 2017 14:30:09 GMT
ARTEMIS-906 Memory Mapped JournalType


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/aacddfda
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/aacddfda
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/aacddfda

Branch: refs/heads/master
Commit: aacddfda61804b203dc8b3efdebafa9384662e22
Parents: ef8cb60
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Fri Dec 16 11:12:22 2016 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Fri Feb 3 09:04:57 2017 -0500

----------------------------------------------------------------------
 .../activemq/artemis/cli/commands/Create.java   |   7 +-
 .../cli/commands/util/SyncCalculation.java      |  12 +
 .../UnpooledUnsafeDirectByteBufWrapper.java     | 371 ++++++++++++++++++
 .../artemis/core/io/mapped/MappedFile.java      |  95 +++--
 .../core/io/mapped/MappedSequentialFile.java    |  92 ++---
 .../io/mapped/MappedSequentialFileFactory.java  |  75 ++--
 .../core/io/mapped/TimedSequentialFile.java     | 377 +++++++++++++++++++
 .../artemis/core/io/JournalTptBenchmark.java    | 208 ++++++++++
 .../core/io/SequentialFileTptBenchmark.java     | 203 ++++++++++
 .../impl/journal/JournalStorageManager.java     |   6 +
 .../core/server/ActiveMQServerLogger.java       |   6 +-
 .../artemis/core/server/JournalType.java        |   1 +
 .../resources/schema/artemis-configuration.xsd  |   1 +
 .../test/resources/artemis-configuration.xsd    |   1 +
 .../integration/journal/AIOJournalImplTest.java |   3 +-
 .../journal/MappedImportExportTest.java         |  30 ++
 .../journal/MappedJournalCompactTest.java       |  37 ++
 .../journal/MappedJournalImplTest.java          |  43 +++
 .../MappedSequentialFileFactoryTest.java        | 184 +++++++++
 .../journal/ValidateTransactionHealthTest.java  |  37 +-
 20 files changed, 1667 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
index 29635e6..cd42f80 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
@@ -216,6 +216,9 @@ public class Create extends InputAbstract {
    @Option(name = "--nio", description = "sets the journal as nio.")
    boolean nio;
 
+   @Option(name = "--mapped", description = "Sets the journal as mapped.")
+   boolean mapped;
+
    // this is used by the setupJournalType method
    private JournalType journalType;
 
@@ -797,7 +800,7 @@ public class Create extends InputAbstract {
    }
 
    private void setupJournalType() {
-      int countJournalTypes = countBoolean(aio, nio);
+      int countJournalTypes = countBoolean(aio, nio, mapped);
       if (countJournalTypes > 1) {
          throw new RuntimeException("You can only select one journal type (--nio | --aio | --mapped).");
       }
@@ -814,6 +817,8 @@ public class Create extends InputAbstract {
          journalType = JournalType.ASYNCIO;
       } else if (nio) {
          journalType = JournalType.NIO;
+      } else if (mapped) {
+         journalType = JournalType.MAPPED;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
index ae7a8ca..6563278 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
@@ -24,9 +24,11 @@ import java.text.DecimalFormat;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.JournalType;
@@ -186,6 +188,16 @@ public class SyncCalculation {
             factory.start();
             ((AIOSequentialFileFactory) factory).disableBufferReuse();
             return factory;
+         case MAPPED:
+            factory = new MappedSequentialFileFactory(datafolder, new IOCriticalErrorListener() {
+               @Override
+               public void onIOException(Throwable code, String message, SequentialFile file) {
+
+               }
+            }, true).chunkBytes(fileSize).overlapBytes(0).setDatasync(datasync);
+
+            factory.start();
+            return factory;
          default:
             throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(journalType);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java b/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java
new file mode 100644
index 0000000..0da33c6
--- /dev/null
+++ b/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * A NIO direct {@link ByteBuffer} wrapper.
+ * Only ByteBuffer's manipulation operations are supported.
+ * Is best suited only for encoding/decoding purposes.
+ */
+public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceCountedByteBuf {
+
+   private ByteBuffer buffer;
+   private long memoryAddress;
+
+   /**
+    * Creates a new direct buffer by wrapping the specified initial buffer.
+    */
+   public UnpooledUnsafeDirectByteBufWrapper() {
+      super(0);
+      this.buffer = null;
+      this.memoryAddress = 0L;
+   }
+
+   public void wrap(ByteBuffer buffer, int srcIndex, int length) {
+      if (buffer != null) {
+         this.buffer = buffer;
+         this.memoryAddress = PlatformDependent.directBufferAddress(buffer) + srcIndex;
+         clear();
+         maxCapacity(length);
+      } else {
+         reset();
+      }
+   }
+
+   public void reset() {
+      this.buffer = null;
+      this.memoryAddress = 0L;
+      clear();
+      maxCapacity(0);
+   }
+
+   @Override
+   public boolean isDirect() {
+      return true;
+   }
+
+   @Override
+   public int capacity() {
+      return maxCapacity();
+   }
+
+   @Override
+   public ByteBuf capacity(int newCapacity) {
+      if (newCapacity != maxCapacity()) {
+         throw new IllegalArgumentException("can't set a capacity different from the max allowed one");
+      }
+      return this;
+   }
+
+   @Override
+   public ByteBufAllocator alloc() {
+      return null;
+   }
+
+   @Override
+   public ByteOrder order() {
+      return ByteOrder.BIG_ENDIAN;
+   }
+
+   @Override
+   public boolean hasArray() {
+      return false;
+   }
+
+   @Override
+   public byte[] array() {
+      throw new UnsupportedOperationException("direct buffer");
+   }
+
+   @Override
+   public int arrayOffset() {
+      throw new UnsupportedOperationException("direct buffer");
+   }
+
+   @Override
+   public boolean hasMemoryAddress() {
+      return true;
+   }
+
+   @Override
+   public long memoryAddress() {
+      return memoryAddress;
+   }
+
+   @Override
+   protected byte _getByte(int index) {
+      return UnsafeByteBufUtil.getByte(addr(index));
+   }
+
+   @Override
+   protected short _getShort(int index) {
+      return UnsafeByteBufUtil.getShort(addr(index));
+   }
+
+   @Override
+   protected short _getShortLE(int index) {
+      return UnsafeByteBufUtil.getShortLE(addr(index));
+   }
+
+   @Override
+   protected int _getUnsignedMedium(int index) {
+      return UnsafeByteBufUtil.getUnsignedMedium(addr(index));
+   }
+
+   @Override
+   protected int _getUnsignedMediumLE(int index) {
+      return UnsafeByteBufUtil.getUnsignedMediumLE(addr(index));
+   }
+
+   @Override
+   protected int _getInt(int index) {
+      return UnsafeByteBufUtil.getInt(addr(index));
+   }
+
+   @Override
+   protected int _getIntLE(int index) {
+      return UnsafeByteBufUtil.getIntLE(addr(index));
+   }
+
+   @Override
+   protected long _getLong(int index) {
+      return UnsafeByteBufUtil.getLong(addr(index));
+   }
+
+   @Override
+   protected long _getLongLE(int index) {
+      return UnsafeByteBufUtil.getLongLE(addr(index));
+   }
+
+   @Override
+   public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+      UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
+      return this;
+   }
+
+   @Override
+   public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+      UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
+      return this;
+   }
+
+   @Override
+   public ByteBuf getBytes(int index, ByteBuffer dst) {
+      UnsafeByteBufUtil.getBytes(this, addr(index), index, dst);
+      return this;
+   }
+
+   @Override
+   public ByteBuf readBytes(ByteBuffer dst) {
+      int length = dst.remaining();
+      checkReadableBytes(length);
+      getBytes(readerIndex, dst);
+      readerIndex += length;
+      return this;
+   }
+
+   @Override
+   protected void _setByte(int index, int value) {
+      UnsafeByteBufUtil.setByte(addr(index), value);
+   }
+
+   @Override
+   protected void _setShort(int index, int value) {
+      UnsafeByteBufUtil.setShort(addr(index), value);
+   }
+
+   @Override
+   protected void _setShortLE(int index, int value) {
+      UnsafeByteBufUtil.setShortLE(addr(index), value);
+   }
+
+   @Override
+   protected void _setMedium(int index, int value) {
+      UnsafeByteBufUtil.setMedium(addr(index), value);
+   }
+
+   @Override
+   protected void _setMediumLE(int index, int value) {
+      UnsafeByteBufUtil.setMediumLE(addr(index), value);
+   }
+
+   @Override
+   protected void _setInt(int index, int value) {
+      UnsafeByteBufUtil.setInt(addr(index), value);
+   }
+
+   @Override
+   protected void _setIntLE(int index, int value) {
+      UnsafeByteBufUtil.setIntLE(addr(index), value);
+   }
+
+   @Override
+   protected void _setLong(int index, long value) {
+      UnsafeByteBufUtil.setLong(addr(index), value);
+   }
+
+   @Override
+   protected void _setLongLE(int index, long value) {
+      UnsafeByteBufUtil.setLongLE(addr(index), value);
+   }
+
+   @Override
+   public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+      UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
+      return this;
+   }
+
+   @Override
+   public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+      UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
+      return this;
+   }
+
+   @Override
+   public ByteBuf setBytes(int index, ByteBuffer src) {
+      UnsafeByteBufUtil.setBytes(this, addr(index), index, src);
+      return this;
+   }
+
+   @Override
+   @Deprecated
+   public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+      throw new UnsupportedOperationException("unsupported!");
+   }
+
+   @Override
+   @Deprecated
+   public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+      throw new UnsupportedOperationException("unsupported!");
+   }
+
+   @Override
+   @Deprecated
+   public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
+      throw new UnsupportedOperationException("unsupported!");
+   }
+
+   @Override
+   @Deprecated
+   public int readBytes(GatheringByteChannel out, int length) throws IOException {
+      throw new UnsupportedOperationException("unsupported!");
+   }
+
+   @Override
+   @Deprecated
+   public int readBytes(FileChannel out, long position, int length) throws IOException {
+      throw new UnsupportedOperationException("unsupported!");
+   }
+
+   @Override
+   @Deprecated
+   public int setBytes(int index, InputStream in, int length) throws IOException {
+      throw new UnsupportedOperationException("unsupported!");
+   }
+
+   @Override
+   @Deprecated
+   public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
+      throw new UnsupportedOperationException("unsupported!");
+   }
+
+   @Override
+   @Deprecated
+   public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
+      throw new UnsupportedOperationException("unsupported!");
+   }
+
+   @Override
+   public int nioBufferCount() {
+      return 1;
+   }
+
+   @Override
+   @Deprecated
+   public ByteBuffer[] nioBuffers(int index, int length) {
+      throw new UnsupportedOperationException("unsupported!");
+   }
+
+   @Override
+   @Deprecated
+   public ByteBuf copy(int index, int length) {
+
+      throw new UnsupportedOperationException("unsupported!");
+
+   }
+
+   @Override
+   @Deprecated
+   public ByteBuffer internalNioBuffer(int index, int length) {
+      throw new UnsupportedOperationException("cannot access directly the wrapped buffer!");
+   }
+
+   @Override
+   @Deprecated
+   public ByteBuffer nioBuffer(int index, int length) {
+      throw new UnsupportedOperationException("unsupported!");
+   }
+
+   @Override
+   @Deprecated
+   protected void deallocate() {
+      //NO_OP
+   }
+
+   @Override
+   public ByteBuf unwrap() {
+      return null;
+   }
+
+   private long addr(int index) {
+      return memoryAddress + index;
+   }
+
+   @Override
+   @Deprecated
+   protected SwappedByteBuf newSwappedByteBuf() {
+      throw new UnsupportedOperationException("unsupported!");
+   }
+
+   @Override
+   public ByteBuf setZero(int index, int length) {
+      UnsafeByteBufUtil.setZero(this, addr(index), index, length);
+      return this;
+   }
+
+   @Override
+   public ByteBuf writeZero(int length) {
+      ensureWritable(length);
+      int wIndex = writerIndex;
+      setZero(wIndex, length);
+      writerIndex = wIndex + length;
+      return this;
+   }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
index 0aa9866..adfc4fe 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
@@ -20,18 +20,19 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.nio.MappedByteBuffer;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledUnsafeDirectByteBufWrapper;
 import io.netty.util.internal.PlatformDependent;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
 
 final class MappedFile implements AutoCloseable {
 
-   private static final ByteBuffer ZERO_PAGE = ByteBuffer.allocateDirect(MappedByteBufferCache.PAGE_SIZE).order(ByteOrder.nativeOrder());
-
    private final MappedByteBufferCache cache;
-   private final int zerosMaxPage;
+   private final UnpooledUnsafeDirectByteBufWrapper byteBufWrapper;
+   private final ChannelBufferWrapper channelBufferWrapper;
    private MappedByteBuffer lastMapped;
    private long lastMappedStart;
    private long lastMappedLimit;
@@ -45,7 +46,8 @@ final class MappedFile implements AutoCloseable {
       this.lastMappedLimit = -1;
       this.position = 0;
       this.length = this.cache.fileSize();
-      this.zerosMaxPage = Math.min(ZERO_PAGE.capacity(), (int) Math.min(Integer.MAX_VALUE, cache.overlapBytes()));
+      this.byteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper();
+      this.channelBufferWrapper = new ChannelBufferWrapper(this.byteBufWrapper, false);
    }
 
    public static MappedFile of(File file, long chunckSize, long overlapSize) throws IOException {
@@ -58,29 +60,33 @@ final class MappedFile implements AutoCloseable {
 
    private int checkOffset(long offset, int bytes) throws BufferUnderflowException, IOException {
       if (!MappedByteBufferCache.inside(offset, lastMappedStart, lastMappedLimit)) {
-         try {
-            final int index = cache.indexFor(offset);
-            final long mappedPosition = cache.mappedPositionFor(index);
-            final long mappedLimit = cache.mappedLimitFor(mappedPosition);
-            if (offset + bytes > mappedLimit) {
-               throw new IOException("mapping overflow!");
-            }
-            lastMapped = cache.acquireMappedByteBuffer(index);
-            lastMappedStart = mappedPosition;
-            lastMappedLimit = mappedLimit;
-            final int bufferPosition = (int) (offset - mappedPosition);
-            return bufferPosition;
-         } catch (IllegalStateException e) {
-            throw new IOException(e);
-         } catch (IllegalArgumentException e) {
-            throw new BufferUnderflowException();
-         }
+         return updateOffset(offset, bytes);
       } else {
          final int bufferPosition = (int) (offset - lastMappedStart);
          return bufferPosition;
       }
    }
 
+   private int updateOffset(long offset, int bytes) throws BufferUnderflowException, IOException {
+      try {
+         final int index = cache.indexFor(offset);
+         final long mappedPosition = cache.mappedPositionFor(index);
+         final long mappedLimit = cache.mappedLimitFor(mappedPosition);
+         if (offset + bytes > mappedLimit) {
+            throw new IOException("mapping overflow!");
+         }
+         lastMapped = cache.acquireMappedByteBuffer(index);
+         lastMappedStart = mappedPosition;
+         lastMappedLimit = mappedLimit;
+         final int bufferPosition = (int) (offset - mappedPosition);
+         return bufferPosition;
+      } catch (IllegalStateException e) {
+         throw new IOException(e);
+      } catch (IllegalArgumentException e) {
+         throw new BufferUnderflowException();
+      }
+   }
+
    public void force() {
       if (lastMapped != null) {
          lastMapped.force();
@@ -180,6 +186,26 @@ final class MappedFile implements AutoCloseable {
    }
 
    /**
+    * Writes an encoded sequence of bytes to this file from the given buffer.
+    * <p>
+    * <p> Bytes are written starting at this file's current position,
+    */
+   public void write(EncodingSupport encodingSupport) throws IOException {
+      final int encodedSize = encodingSupport.getEncodeSize();
+      final int bufferPosition = checkOffset(position, encodedSize);
+      this.byteBufWrapper.wrap(this.lastMapped, bufferPosition, encodedSize);
+      try {
+         encodingSupport.encode(this.channelBufferWrapper);
+      } finally {
+         this.byteBufWrapper.reset();
+      }
+      position += encodedSize;
+      if (position > this.length) {
+         this.length = position;
+      }
+   }
+
+   /**
     * Writes a sequence of bytes to this file from the given buffer.
     * <p>
     * <p> Bytes are written starting at this file's current position,
@@ -273,21 +299,20 @@ final class MappedFile implements AutoCloseable {
     * <p> Bytes are written starting at this file's current position,
     */
    public void zeros(long offset, int count) throws IOException {
-      final long targetOffset = offset + count;
-      final int zerosBulkCopies = count / zerosMaxPage;
-      final long srcAddress = PlatformDependent.directBufferAddress(ZERO_PAGE);
-      for (int i = 0; i < zerosBulkCopies; i++) {
-         final int bufferPosition = checkOffset(offset, zerosMaxPage);
+      while (count > 0) {
+         //do not need to validate the bytes count
+         final int bufferPosition = checkOffset(offset, 0);
+         final int endZerosPosition = (int)Math.min((long)bufferPosition + count, lastMapped.capacity());
+         final int zeros = endZerosPosition - bufferPosition;
          final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
-         PlatformDependent.copyMemory(srcAddress, destAddress, zerosMaxPage);
-         offset += zerosMaxPage;
+         PlatformDependent.setMemory(destAddress, zeros, (byte) 0);
+         offset += zeros;
+         count -= zeros;
+         //TODO need to call force on each write?
+         //this.force();
       }
-      final int remainingToBeZeroes = (int) (targetOffset - offset);
-      final int bufferPosition = checkOffset(offset, remainingToBeZeroes);
-      final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
-      PlatformDependent.copyMemory(srcAddress, destAddress, remainingToBeZeroes);
-      if (targetOffset > this.length) {
-         this.length = targetOffset;
+      if (offset > this.length) {
+         this.length = offset;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
index a959113..12e359c 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
@@ -20,16 +20,13 @@ import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
@@ -44,12 +41,11 @@ final class MappedSequentialFile implements SequentialFile {
    private final long chunkBytes;
    private final long overlapBytes;
    private final IOCriticalErrorListener criticalErrorListener;
+   private final MappedSequentialFileFactory factory;
    private File file;
    private File absoluteFile;
    private String fileName;
    private MappedFile mappedFile;
-   private ActiveMQBuffer pooledActiveMQBuffer;
-   private final MappedSequentialFileFactory factory;
 
    MappedSequentialFile(MappedSequentialFileFactory factory,
                         final File directory,
@@ -65,19 +61,24 @@ final class MappedSequentialFile implements SequentialFile {
       this.chunkBytes = chunkBytes;
       this.overlapBytes = overlapBytes;
       this.mappedFile = null;
-      this.pooledActiveMQBuffer = null;
       this.criticalErrorListener = criticalErrorListener;
    }
 
    private void checkIsOpen() {
       if (!isOpen()) {
-         throw new IllegalStateException("must be open!");
+         throw new IllegalStateException("File not opened!");
+      }
+   }
+
+   private void checkIsOpen(IOCallback callback) {
+      if (!isOpen()) {
+         callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened");
       }
    }
 
    private void checkIsNotOpen() {
       if (isOpen()) {
-         throw new IllegalStateException("must be closed!");
+         throw new IllegalStateException("File opened!");
       }
    }
 
@@ -101,7 +102,6 @@ final class MappedSequentialFile implements SequentialFile {
    @Override
    public void open(int maxIO, boolean useExecutor) throws IOException {
       //ignore maxIO e useExecutor
-      ActiveMQJournalLogger.LOGGER.warn("ignoring maxIO and useExecutor unsupported parameters!");
       this.open();
    }
 
@@ -134,7 +134,7 @@ final class MappedSequentialFile implements SequentialFile {
 
    @Override
    public void delete() {
-      checkIsNotOpen();
+      close();
       if (file.exists() && !file.delete()) {
          ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
       }
@@ -142,10 +142,10 @@ final class MappedSequentialFile implements SequentialFile {
 
    @Override
    public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws IOException {
-      checkIsOpen();
       if (callback == null) {
          throw new NullPointerException("callback parameter need to be set");
       }
+      checkIsOpen(callback);
       try {
          final ByteBuf byteBuf = bytes.byteBuf();
          final int writerIndex = byteBuf.writerIndex();
@@ -182,34 +182,16 @@ final class MappedSequentialFile implements SequentialFile {
       }
    }
 
-   private ActiveMQBuffer acquiresActiveMQBufferWithAtLeast(int size) {
-      if (this.pooledActiveMQBuffer == null || this.pooledActiveMQBuffer.capacity() < size) {
-         this.pooledActiveMQBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(size, size).order(ByteOrder.nativeOrder()));
-      } else {
-         this.pooledActiveMQBuffer.clear();
-      }
-      return pooledActiveMQBuffer;
-   }
-
    @Override
    public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws IOException {
-      checkIsOpen();
       if (callback == null) {
          throw new NullPointerException("callback parameter need to be set");
       }
+      checkIsOpen(callback);
       try {
-         final int encodedSize = bytes.getEncodeSize();
-         final ActiveMQBuffer outBuffer = acquiresActiveMQBufferWithAtLeast(encodedSize);
-         bytes.encode(outBuffer);
-         final ByteBuf byteBuf = outBuffer.byteBuf();
-         final int writerIndex = byteBuf.writerIndex();
-         final int readerIndex = byteBuf.readerIndex();
-         final int readableBytes = writerIndex - readerIndex;
-         if (readableBytes > 0) {
-            this.mappedFile.write(byteBuf, readerIndex, readableBytes);
-            if (factory.isDatasync() && sync) {
-               this.mappedFile.force();
-            }
+         this.mappedFile.write(bytes);
+         if (factory.isDatasync() && sync) {
+            this.mappedFile.force();
          }
          callback.done();
       } catch (IOException e) {
@@ -224,33 +206,26 @@ final class MappedSequentialFile implements SequentialFile {
    @Override
    public void write(EncodingSupport bytes, boolean sync) throws IOException {
       checkIsOpen();
-      final int encodedSize = bytes.getEncodeSize();
-      final ActiveMQBuffer outBuffer = acquiresActiveMQBufferWithAtLeast(encodedSize);
-      bytes.encode(outBuffer);
-      final ByteBuf byteBuf = outBuffer.byteBuf();
-      final int writerIndex = byteBuf.writerIndex();
-      final int readerIndex = byteBuf.readerIndex();
-      final int readableBytes = writerIndex - readerIndex;
-      if (readableBytes > 0) {
-         this.mappedFile.write(byteBuf, readerIndex, readableBytes);
-         if (factory.isDatasync() && sync) {
-            this.mappedFile.force();
-         }
+      this.mappedFile.write(bytes);
+      if (factory.isDatasync() && sync) {
+         this.mappedFile.force();
       }
    }
 
    @Override
    public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
-      checkIsOpen();
       if (callback == null) {
          throw new NullPointerException("callback parameter need to be set");
       }
+      checkIsOpen(callback);
       try {
          final int position = bytes.position();
          final int limit = bytes.limit();
          final int remaining = limit - position;
          if (remaining > 0) {
             this.mappedFile.write(bytes, position, remaining);
+            final int newPosition = position + remaining;
+            bytes.position(newPosition);
             if (factory.isDatasync() && sync) {
                this.mappedFile.force();
             }
@@ -273,6 +248,8 @@ final class MappedSequentialFile implements SequentialFile {
       final int remaining = limit - position;
       if (remaining > 0) {
          this.mappedFile.write(bytes, position, remaining);
+         final int newPosition = position + remaining;
+         bytes.position(newPosition);
          if (factory.isDatasync() && sync) {
             this.mappedFile.force();
          }
@@ -281,10 +258,10 @@ final class MappedSequentialFile implements SequentialFile {
 
    @Override
    public int read(ByteBuffer bytes, IOCallback callback) throws IOException {
-      checkIsOpen();
       if (callback == null) {
          throw new NullPointerException("callback parameter need to be set");
       }
+      checkIsOpen(callback);
       try {
          final int position = bytes.position();
          final int limit = bytes.limit();
@@ -296,8 +273,10 @@ final class MappedSequentialFile implements SequentialFile {
             bytes.flip();
             callback.done();
             return bytesRead;
+         } else {
+            callback.done();
+            return 0;
          }
-         return 0;
       } catch (IOException e) {
          if (this.criticalErrorListener != null) {
             this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
@@ -360,7 +339,14 @@ final class MappedSequentialFile implements SequentialFile {
 
    @Override
    public void renameTo(String newFileName) throws Exception {
-      checkIsNotOpen();
+      try {
+         close();
+      } catch (Exception e) {
+         if (e instanceof IOException) {
+            factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         }
+         throw e;
+      }
       if (this.fileName == null) {
          this.fileName = this.file.getName();
       }
@@ -388,14 +374,10 @@ final class MappedSequentialFile implements SequentialFile {
       if (dstFile.isOpen()) {
          throw new IllegalArgumentException("dstFile must be closed too");
       }
-      try (RandomAccessFile src = new RandomAccessFile(file, "rw");
-           FileChannel srcChannel = src.getChannel();
-           FileLock srcLock = srcChannel.lock()) {
+      try (RandomAccessFile src = new RandomAccessFile(file, "rw"); FileChannel srcChannel = src.getChannel(); FileLock srcLock = srcChannel.lock()) {
          final long readableBytes = srcChannel.size();
          if (readableBytes > 0) {
-            try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw");
-                 FileChannel dstChannel = dst.getChannel();
-                 FileLock dstLock = dstChannel.lock()) {
+            try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw"); FileChannel dstChannel = dst.getChannel(); FileLock dstLock = dstChannel.lock()) {
                final long oldLength = dst.length();
                final long newLength = oldLength + readableBytes;
                dst.setLength(newLength);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
index 55bb2bf..c4b7d30 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
@@ -20,7 +20,6 @@ import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -29,30 +28,42 @@ import io.netty.util.internal.PlatformDependent;
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
 
 public final class MappedSequentialFileFactory implements SequentialFileFactory {
 
    private static long DEFAULT_BLOCK_SIZE = 64L << 20;
    private final File directory;
    private final IOCriticalErrorListener criticalErrorListener;
+   private final TimedBuffer timedBuffer;
    private long chunkBytes;
    private long overlapBytes;
    private boolean useDataSync;
+   private boolean supportCallbacks;
 
-   public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
+   protected volatile int alignment = -1;
+
+   public MappedSequentialFileFactory(File directory,
+                                      IOCriticalErrorListener criticalErrorListener,
+                                      boolean supportCallbacks) {
       this.directory = directory;
       this.criticalErrorListener = criticalErrorListener;
       this.chunkBytes = DEFAULT_BLOCK_SIZE;
       this.overlapBytes = DEFAULT_BLOCK_SIZE / 4;
+      this.useDataSync = true;
+      this.timedBuffer = null;
+      this.supportCallbacks = supportCallbacks;
+   }
+
+   public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
+      this(directory, criticalErrorListener, false);
    }
 
    public MappedSequentialFileFactory(File directory) {
-      this.directory = directory;
-      this.criticalErrorListener = null;
-      this.chunkBytes = DEFAULT_BLOCK_SIZE;
-      this.overlapBytes = DEFAULT_BLOCK_SIZE / 4;
+      this(directory, null);
    }
 
+
    public long chunkBytes() {
       return chunkBytes;
    }
@@ -73,7 +84,12 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
 
    @Override
    public SequentialFile createSequentialFile(String fileName) {
-      return new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
+      final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
+      if (this.timedBuffer == null) {
+         return mappedSequentialFile;
+      } else {
+         return new TimedSequentialFile(this, mappedSequentialFile);
+      }
    }
 
    @Override
@@ -89,17 +105,12 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
 
    @Override
    public int getMaxIO() {
-      return 0;
+      return 1;
    }
 
    @Override
    public List<String> listFiles(final String extension) throws Exception {
-      final FilenameFilter extensionFilter = new FilenameFilter() {
-         @Override
-         public boolean accept(final File file, final String name) {
-            return name.endsWith("." + extension);
-         }
-      };
+      final FilenameFilter extensionFilter = (file, name) -> name.endsWith("." + extension);
       final String[] fileNames = directory.list(extensionFilter);
       if (fileNames == null) {
          return Collections.EMPTY_LIST;
@@ -109,7 +120,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
 
    @Override
    public boolean isSupportsCallbacks() {
-      return false;
+      return this.supportCallbacks;
    }
 
    @Override
@@ -121,7 +132,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
 
    @Override
    public ByteBuffer allocateDirectBuffer(final int size) {
-      return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder());
+      return ByteBuffer.allocateDirect(size);
    }
 
    @Override
@@ -131,7 +142,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
 
    @Override
    public ByteBuffer newBuffer(final int size) {
-      return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder());
+      return ByteBuffer.allocate(size);
    }
 
    @Override
@@ -143,17 +154,23 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
 
    @Override
    public void activateBuffer(SequentialFile file) {
-
+      if (timedBuffer != null) {
+         file.setTimedBuffer(timedBuffer);
+      }
    }
 
    @Override
    public void deactivateBuffer() {
-
+      if (timedBuffer != null) {
+         // When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer
+         timedBuffer.flush();
+         timedBuffer.setObserver(null);
+      }
    }
 
    @Override
    public ByteBuffer wrapBuffer(final byte[] bytes) {
-      return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+      return ByteBuffer.wrap(bytes);
    }
 
    @Override
@@ -162,8 +179,8 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
    }
 
    @Override
-   public SequentialFileFactory setAlignment(int alignment) {
-      // no op
+   public MappedSequentialFileFactory setAlignment(int alignment) {
+      this.alignment = alignment;
       return this;
    }
 
@@ -179,7 +196,6 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
 
    @Override
    public void clearBuffer(final ByteBuffer buffer) {
-      buffer.clear();
       if (buffer.isDirect()) {
          BytesUtils.zerosDirect(buffer);
       } else if (buffer.hasArray()) {
@@ -193,16 +209,21 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
             buffer.put(i, (byte) 0);
          }
       }
+      buffer.rewind();
    }
 
    @Override
    public void start() {
-
+      if (timedBuffer != null) {
+         timedBuffer.start();
+      }
    }
 
    @Override
    public void stop() {
-
+      if (timedBuffer != null) {
+         timedBuffer.stop();
+      }
    }
 
    @Override
@@ -215,6 +236,8 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
 
    @Override
    public void flush() {
-
+      if (timedBuffer != null) {
+         timedBuffer.flush();
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
new file mode 100644
index 0000000..d376d7d
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.io.mapped;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.core.io.DummyCallback;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
+import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+
+final class TimedSequentialFile implements SequentialFile {
+
+   private final SequentialFileFactory factory;
+   private final SequentialFile sequentialFile;
+   private final LocalBufferObserver observer;
+   private final ThreadLocal<ResettableIOCallback> callbackPool;
+   private TimedBuffer timedBuffer;
+
+   TimedSequentialFile(SequentialFileFactory factory, SequentialFile sequentialFile) {
+      this.sequentialFile = sequentialFile;
+      this.factory = factory;
+      this.observer = new LocalBufferObserver();
+      this.callbackPool = ThreadLocal.withInitial(ResettableIOCallback::new);
+   }
+
+   @Override
+   public boolean isOpen() {
+      return this.sequentialFile.isOpen();
+   }
+
+   @Override
+   public boolean exists() {
+      return this.sequentialFile.exists();
+   }
+
+   @Override
+   public void open() throws Exception {
+      this.sequentialFile.open();
+   }
+
+   @Override
+   public void open(int maxIO, boolean useExecutor) throws Exception {
+      this.sequentialFile.open(maxIO, useExecutor);
+   }
+
+   @Override
+   public boolean fits(int size) {
+      if (timedBuffer == null) {
+         return this.sequentialFile.fits(size);
+      } else {
+         return timedBuffer.checkSize(size);
+      }
+   }
+
+   @Override
+   public int calculateBlockStart(int position) throws Exception {
+      return this.sequentialFile.calculateBlockStart(position);
+   }
+
+   @Override
+   public String getFileName() {
+      return this.sequentialFile.getFileName();
+   }
+
+   @Override
+   public void fill(int size) throws Exception {
+      this.sequentialFile.fill(size);
+   }
+
+   @Override
+   public void delete() throws IOException, InterruptedException, ActiveMQException {
+      this.sequentialFile.delete();
+   }
+
+   @Override
+   public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
+      if (this.timedBuffer != null) {
+         this.timedBuffer.addBytes(bytes, sync, callback);
+      } else {
+         this.sequentialFile.write(bytes, sync, callback);
+      }
+   }
+
+   @Override
+   public void write(ActiveMQBuffer bytes, boolean sync) throws Exception {
+      if (sync) {
+         if (this.timedBuffer != null) {
+            final ResettableIOCallback callback = callbackPool.get();
+            try {
+               this.timedBuffer.addBytes(bytes, true, callback);
+               callback.waitCompletion();
+            } finally {
+               callback.reset();
+            }
+         } else {
+            this.sequentialFile.write(bytes, true);
+         }
+      } else {
+         if (this.timedBuffer != null) {
+            this.timedBuffer.addBytes(bytes, false, DummyCallback.getInstance());
+         } else {
+            this.sequentialFile.write(bytes, false);
+         }
+      }
+   }
+
+   @Override
+   public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception {
+      if (this.timedBuffer != null) {
+         this.timedBuffer.addBytes(bytes, sync, callback);
+      } else {
+         this.sequentialFile.write(bytes, sync, callback);
+      }
+   }
+
+   @Override
+   public void write(EncodingSupport bytes, boolean sync) throws Exception {
+      if (sync) {
+         if (this.timedBuffer != null) {
+            final ResettableIOCallback callback = callbackPool.get();
+            try {
+               this.timedBuffer.addBytes(bytes, true, callback);
+               callback.waitCompletion();
+            } finally {
+               callback.reset();
+            }
+         } else {
+            this.sequentialFile.write(bytes, true);
+         }
+      } else {
+         if (this.timedBuffer != null) {
+            this.timedBuffer.addBytes(bytes, false, DummyCallback.getInstance());
+         } else {
+            this.sequentialFile.write(bytes, false);
+         }
+      }
+   }
+
+   @Override
+   public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
+      this.sequentialFile.writeDirect(bytes, sync, callback);
+   }
+
+   @Override
+   public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
+      this.sequentialFile.writeDirect(bytes, sync);
+   }
+
+   @Override
+   public int read(ByteBuffer bytes, IOCallback callback) throws Exception {
+      return this.sequentialFile.read(bytes, callback);
+   }
+
+   @Override
+   public int read(ByteBuffer bytes) throws Exception {
+      return this.sequentialFile.read(bytes);
+   }
+
+   @Override
+   public void position(long pos) throws IOException {
+      this.sequentialFile.position(pos);
+   }
+
+   @Override
+   public long position() {
+      return this.sequentialFile.position();
+   }
+
+   @Override
+   public void close() throws Exception {
+      this.sequentialFile.close();
+   }
+
+   @Override
+   public void sync() throws IOException {
+      this.sequentialFile.sync();
+   }
+
+   @Override
+   public long size() throws Exception {
+      return this.sequentialFile.size();
+   }
+
+   @Override
+   public void renameTo(String newFileName) throws Exception {
+      this.sequentialFile.renameTo(newFileName);
+   }
+
+   @Override
+   public SequentialFile cloneFile() {
+      return new TimedSequentialFile(factory, this.sequentialFile.cloneFile());
+   }
+
+   @Override
+   public void copyTo(SequentialFile newFileName) throws Exception {
+      this.sequentialFile.copyTo(newFileName);
+   }
+
+   @Override
+   public void setTimedBuffer(TimedBuffer buffer) {
+      if (this.timedBuffer != null) {
+         this.timedBuffer.setObserver(null);
+      }
+      this.timedBuffer = buffer;
+      if (buffer != null) {
+         buffer.setObserver(this.observer);
+      }
+   }
+
+   @Override
+   public File getJavaFile() {
+      return this.sequentialFile.getJavaFile();
+   }
+
+   private static final class ResettableIOCallback implements IOCallback {
+
+      private final CyclicBarrier cyclicBarrier;
+      private int errorCode;
+      private String errorMessage;
+
+      ResettableIOCallback() {
+         this.cyclicBarrier = new CyclicBarrier(2);
+      }
+
+      public void waitCompletion() throws InterruptedException, ActiveMQException, BrokenBarrierException {
+         this.cyclicBarrier.await();
+         if (this.errorMessage != null) {
+            throw ActiveMQExceptionType.createException(this.errorCode, this.errorMessage);
+         }
+      }
+
+      public void reset() {
+         this.errorCode = 0;
+         this.errorMessage = null;
+      }
+
+      @Override
+      public void done() {
+         try {
+            this.cyclicBarrier.await();
+         } catch (BrokenBarrierException | InterruptedException e) {
+            throw new IllegalStateException(e);
+         }
+      }
+
+      @Override
+      public void onError(int errorCode, String errorMessage) {
+         try {
+            this.errorCode = errorCode;
+            this.errorMessage = errorMessage;
+            this.cyclicBarrier.await();
+         } catch (BrokenBarrierException | InterruptedException e) {
+            throw new IllegalStateException(e);
+         }
+      }
+   }
+
+   private static final class DelegateCallback implements IOCallback {
+
+      final List<IOCallback> delegates;
+
+      private DelegateCallback() {
+         this.delegates = new ArrayList<>();
+      }
+
+      public List<IOCallback> delegates() {
+         return this.delegates;
+      }
+
+      @Override
+      public void done() {
+         final int size = delegates.size();
+         for (int i = 0; i < size; i++) {
+            try {
+               final IOCallback callback = delegates.get(i);
+               callback.done();
+            } catch (Throwable e) {
+               ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
+            }
+         }
+      }
+
+      @Override
+      public void onError(final int errorCode, final String errorMessage) {
+         for (IOCallback callback : delegates) {
+            try {
+               callback.onError(errorCode, errorMessage);
+            } catch (Throwable e) {
+               ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
+            }
+         }
+      }
+   }
+
+   private final class LocalBufferObserver implements TimedBufferObserver {
+
+      private final ThreadLocal<DelegateCallback> callbacksPool = ThreadLocal.withInitial(DelegateCallback::new);
+
+      @Override
+      public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCallback> callbacks) {
+         buffer.flip();
+         if (buffer.limit() == 0) {
+            //if there are no bytes to flush, can release the callbacks
+            final int size = callbacks.size();
+            for (int i = 0; i < size; i++) {
+               callbacks.get(i).done();
+            }
+         } else {
+            final DelegateCallback delegateCallback = callbacksPool.get();
+            final int size = callbacks.size();
+            final List<IOCallback> delegates = delegateCallback.delegates();
+            for (int i = 0; i < size; i++) {
+               delegates.add(callbacks.get(i));
+            }
+            try {
+               sequentialFile.writeDirect(buffer, requestedSync, delegateCallback);
+            } finally {
+               delegates.clear();
+            }
+         }
+      }
+
+      @Override
+      public ByteBuffer newBuffer(final int size, final int limit) {
+         final int alignedSize = factory.calculateBlockSize(size);
+         final int alignedLimit = factory.calculateBlockSize(limit);
+         final ByteBuffer buffer = factory.newBuffer(alignedSize);
+         buffer.limit(alignedLimit);
+         return buffer;
+      }
+
+      @Override
+      public int getRemainingBytes() {
+         try {
+            final int remaining = (int) Math.min(sequentialFile.size() - sequentialFile.position(), Integer.MAX_VALUE);
+            return remaining;
+         } catch (Exception e) {
+            throw new IllegalStateException(e);
+         }
+      }
+
+      @Override
+      public String toString() {
+         return "TimedBufferObserver on file (" + getFileName() + ")";
+      }
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
new file mode 100644
index 0000000..b426219
--- /dev/null
+++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.io;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Stream;
+
+import io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueue;
+import org.apache.activemq.artemis.ArtemisConstants;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
+import org.apache.activemq.artemis.jlibaio.LibaioContext;
+
+/**
+ * To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
+ */
+public class JournalTptBenchmark {
+
+   public static void main(String[] args) throws Exception {
+      final boolean useDefaultIoExecutor = true;
+      final int fileSize = 1024 * 1024;
+      final boolean dataSync = true;
+      final Type type = Type.Mapped;
+      final int tests = 5;
+      final int warmup = 20_000;
+      final int measurements = 20_000;
+      final int msgSize = 100;
+      final byte[] msgContent = new byte[msgSize];
+      Arrays.fill(msgContent, (byte) 1);
+      final int totalMessages = (measurements * tests + warmup);
+      final File tmpDirectory = new File("./");
+      //using the default configuration when the broker starts!
+      final SequentialFileFactory factory;
+      switch (type) {
+
+         case Mapped:
+            final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true);
+            factory = mappedFactory.chunkBytes(fileSize).overlapBytes(0).setDatasync(dataSync);
+            break;
+         case Nio:
+            factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
+            break;
+         case Aio:
+            factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync);
+            //disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse();
+            if (!LibaioContext.isLoaded()) {
+               throw new IllegalStateException("lib AIO not loaded!");
+            }
+            break;
+         default:
+            throw new AssertionError("unsupported case");
+      }
+
+      int numFiles = (int) (totalMessages * factory.calculateBlockSize(msgSize)) / fileSize;
+      if (numFiles < 2) {
+         numFiles = 2;
+      }
+      ExecutorService service = null;
+      final Journal journal;
+      if (useDefaultIoExecutor) {
+         journal = new JournalImpl(fileSize, numFiles, numFiles, Integer.MAX_VALUE, 100, factory, "activemq-data", "amq", factory.getMaxIO());
+         journal.start();
+      } else {
+         final ArrayList<MpscArrayQueue<Runnable>> tasks = new ArrayList<>();
+         service = Executors.newSingleThreadExecutor();
+         journal = new JournalImpl(() -> new Executor() {
+
+            private final MpscArrayQueue<Runnable> taskQueue = new MpscArrayQueue<>(1024);
+
+            {
+               tasks.add(taskQueue);
+            }
+
+            @Override
+            public void execute(Runnable command) {
+               while (!taskQueue.offer(command)) {
+                  LockSupport.parkNanos(1L);
+               }
+            }
+         }, fileSize, numFiles, numFiles, Integer.MAX_VALUE, 100, factory, "activemq-data", "amq", factory.getMaxIO(), 0);
+         journal.start();
+         service.execute(() -> {
+            final int size = tasks.size();
+            final int capacity = 1024;
+            while (!Thread.currentThread().isInterrupted()) {
+               for (int i = 0; i < size; i++) {
+                  final MpscArrayQueue<Runnable> runnables = tasks.get(i);
+                  for (int j = 0; j < capacity; j++) {
+                     final Runnable task = runnables.poll();
+                     if (task == null) {
+                        break;
+                     }
+                     try {
+                        task.run();
+                     } catch (Throwable t) {
+                        System.err.println(t);
+                     }
+                  }
+               }
+            }
+
+         });
+      }
+      try {
+         journal.load(new ArrayList<RecordInfo>(), null, null);
+      } catch (Exception e) {
+         throw new RuntimeException(e);
+      }
+      try {
+         final EncodingSupport encodingSupport = new EncodingSupport() {
+            @Override
+            public int getEncodeSize() {
+               return msgSize;
+            }
+
+            @Override
+            public void encode(ActiveMQBuffer buffer) {
+               final int writerIndex = buffer.writerIndex();
+               buffer.setBytes(writerIndex, msgContent);
+               buffer.writerIndex(writerIndex + msgSize);
+            }
+
+            @Override
+            public void decode(ActiveMQBuffer buffer) {
+
+            }
+         };
+         long id = 1;
+         {
+            final long elapsed = writeMeasurements(id, journal, encodingSupport, warmup);
+            id += warmup;
+            System.out.println("warmup:" + (measurements * 1000_000_000L) / elapsed + " ops/sec");
+         }
+         for (int t = 0; t < tests; t++) {
+            final long elapsed = writeMeasurements(id, journal, encodingSupport, measurements);
+            System.out.println((measurements * 1000_000_000L) / elapsed + " ops/sec");
+            id += warmup;
+         }
+
+      } finally {
+         journal.stop();
+         if (service != null) {
+            service.shutdown();
+         }
+         final File[] fileToDeletes = tmpDirectory.listFiles();
+         System.out.println("Files to deletes" + Arrays.toString(fileToDeletes));
+         Stream.of(fileToDeletes).forEach(File::delete);
+      }
+   }
+
+   private static long writeMeasurements(long id,
+                                         Journal journal,
+                                         EncodingSupport encodingSupport,
+                                         int measurements) throws Exception {
+      System.gc();
+      TimeUnit.SECONDS.sleep(2);
+
+      final long start = System.nanoTime();
+      for (int i = 0; i < measurements; i++) {
+         write(id, journal, encodingSupport);
+         id++;
+      }
+      final long elapsed = System.nanoTime() - start;
+      return elapsed;
+   }
+
+   private static void write(long id, Journal journal, EncodingSupport encodingSupport) throws Exception {
+      journal.appendAddRecord(id, (byte) 1, encodingSupport, false);
+      final SimpleWaitIOCallback ioCallback = new SimpleWaitIOCallback();
+      journal.appendUpdateRecord(id, (byte) 1, encodingSupport, true, ioCallback);
+      ioCallback.waitCompletion();
+   }
+
+   private enum Type {
+
+      Mapped, Nio, Aio
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
new file mode 100644
index 0000000..7756a06
--- /dev/null
+++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.io;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+
+import org.apache.activemq.artemis.ArtemisConstants;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.jlibaio.LibaioContext;
+
+/**
+ * To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
+ */
+public class SequentialFileTptBenchmark {
+
+   private static final FastWaitIOCallback CALLBACK = new FastWaitIOCallback();
+
+   public static void main(String[] args) throws Exception {
+      final boolean dataSync = true;
+      final boolean writeSync = true;
+      final Type type = Type.Mapped;
+      final int tests = 10;
+      final int warmup = 20_000;
+      final int measurements = 20_000;
+      final int msgSize = 100;
+      final byte[] msgContent = new byte[msgSize];
+      Arrays.fill(msgContent, (byte) 1);
+      final File tmpDirectory = new File("./");
+      //using the default configuration when the broker starts!
+      final SequentialFileFactory factory;
+      switch (type) {
+
+         case Mapped:
+            final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true);
+            final int alignedMessageSize = mappedFactory.calculateBlockSize(msgSize);
+            final int totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup);
+            factory = mappedFactory.chunkBytes(totalFileSize).overlapBytes(0).setDatasync(dataSync);
+            break;
+         case Nio:
+            factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
+            break;
+         case Aio:
+            factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync);
+            //disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse();
+            if (!LibaioContext.isLoaded()) {
+               throw new IllegalStateException("lib AIO not loaded!");
+            }
+            break;
+         default:
+            throw new AssertionError("unsupported case");
+      }
+      factory.start();
+      try {
+         final EncodingSupport encodingSupport = new EncodingSupport() {
+            @Override
+            public int getEncodeSize() {
+               return msgSize;
+            }
+
+            @Override
+            public void encode(ActiveMQBuffer buffer) {
+               final int writerIndex = buffer.writerIndex();
+               buffer.setBytes(writerIndex, msgContent);
+               buffer.writerIndex(writerIndex + msgSize);
+            }
+
+            @Override
+            public void decode(ActiveMQBuffer buffer) {
+
+            }
+         };
+         final int alignedMessageSize = factory.calculateBlockSize(msgSize);
+         final long totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup);
+         if (totalFileSize > Integer.MAX_VALUE)
+            throw new IllegalArgumentException("reduce measurements/warmup");
+         final int fileSize = (int) totalFileSize;
+         final SequentialFile sequentialFile = factory.createSequentialFile("seq.dat");
+         sequentialFile.getJavaFile().delete();
+         sequentialFile.getJavaFile().deleteOnExit();
+         sequentialFile.open();
+         final long startZeros = System.nanoTime();
+         sequentialFile.fill(fileSize);
+         final long elapsedZeros = System.nanoTime() - startZeros;
+         System.out.println("Zeroed " + fileSize + " bytes in " + TimeUnit.NANOSECONDS.toMicros(elapsedZeros) + " us");
+         try {
+            {
+               final long elapsed = writeMeasurements(factory, sequentialFile, encodingSupport, warmup, writeSync);
+               System.out.println("warmup:" + (measurements * 1000_000_000L) / elapsed + " ops/sec");
+            }
+            for (int t = 0; t < tests; t++) {
+               final long elapsed = writeMeasurements(factory, sequentialFile, encodingSupport, measurements, writeSync);
+               System.out.println((measurements * 1000_000_000L) / elapsed + " ops/sec");
+            }
+         } finally {
+            sequentialFile.close();
+         }
+      } finally {
+         factory.stop();
+      }
+   }
+
+   private static long writeMeasurements(SequentialFileFactory sequentialFileFactory,
+                                         SequentialFile sequentialFile,
+                                         EncodingSupport encodingSupport,
+                                         int measurements,
+                                         boolean writeSync) throws Exception {
+      //System.gc();
+      TimeUnit.SECONDS.sleep(2);
+      sequentialFileFactory.activateBuffer(sequentialFile);
+      sequentialFile.position(0);
+      final long start = System.nanoTime();
+      for (int i = 0; i < measurements; i++) {
+         write(sequentialFile, encodingSupport, writeSync);
+      }
+      sequentialFileFactory.deactivateBuffer();
+      final long elapsed = System.nanoTime() - start;
+      return elapsed;
+   }
+
+   private static void write(SequentialFile sequentialFile,
+                             EncodingSupport encodingSupport,
+                             boolean sync) throws Exception {
+      //this pattern is necessary to ensure that NIO's TimedBuffer fill flush the buffer and know the real size of it
+      if (sequentialFile.fits(encodingSupport.getEncodeSize())) {
+         final FastWaitIOCallback ioCallback = CALLBACK.reset();
+         sequentialFile.write(encodingSupport, sync, ioCallback);
+         ioCallback.waitCompletion();
+      } else {
+         throw new IllegalStateException("can't happen!");
+      }
+   }
+
+   private enum Type {
+
+      Mapped, Nio, Aio
+
+   }
+
+   private static final class FastWaitIOCallback implements IOCallback {
+
+      private final AtomicBoolean done = new AtomicBoolean(false);
+      private int errorCode = 0;
+      private String errorMessage = null;
+
+      public FastWaitIOCallback reset() {
+         errorCode = 0;
+         errorMessage = null;
+         done.lazySet(false);
+         return this;
+      }
+
+      @Override
+      public void done() {
+         errorCode = 0;
+         errorMessage = null;
+         done.lazySet(true);
+      }
+
+      @Override
+      public void onError(int errorCode, String errorMessage) {
+         this.errorCode = errorCode;
+         this.errorMessage = errorMessage;
+         done.lazySet(true);
+      }
+
+      public void waitCompletion() throws InterruptedException, ActiveMQException {
+         final Thread currentThread = Thread.currentThread();
+         while (!done.get()) {
+            LockSupport.parkNanos(1L);
+            if (currentThread.isInterrupted())
+               throw new InterruptedException();
+         }
+         if (errorMessage != null) {
+            throw ActiveMQExceptionType.createException(errorCode, errorMessage);
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 7c0a651..51fd6cc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
@@ -136,6 +137,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
             ActiveMQServerLogger.LOGGER.journalUseAIO();
             journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener);
             break;
+         case MAPPED:
+            ActiveMQServerLogger.LOGGER.journalUseMAPPED();
+            //the mapped version do not need buffering by default
+            journalFF = new MappedSequentialFileFactory(config.getJournalLocation(), criticalErrorListener, true).chunkBytes(config.getJournalFileSize()).overlapBytes(0);
+            break;
          default:
             throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index c365b7d..7d5822c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1558,8 +1558,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format = Message.Format.MESSAGE_FORMAT)
    void invalidMessageCounterPeriod(long value);
 
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 224073, value = "Using MAPPED Journal", format = Message.Format.MESSAGE_FORMAT)
+   void journalUseMAPPED();
+
    @LogMessage(level = Logger.Level.ERROR)
-   @Message(id = 224073, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT)
+   @Message(id = 224074, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT)
    void failedToPurgeQueue(@Cause Exception e, SimpleString bindingName);
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java
index 2716a38..df60e9b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java
@@ -40,6 +40,7 @@ public enum JournalType {
       switch (type) {
          case "NIO": return NIO;
          case "ASYNCIO" : return ASYNCIO;
+         case "MAPPED" : return MAPPED;
          default: throw new IllegalStateException("Invalid JournalType:" + type + " valid Types: " + validValues);
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 66739fe..bc9363f 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -584,6 +584,7 @@
                <xsd:restriction base="xsd:string">
                   <xsd:enumeration value="ASYNCIO"/>
                   <xsd:enumeration value="NIO"/>
+                  <xsd:enumeration value="MAPPED"/>
                </xsd:restriction>
             </xsd:simpleType>
          </xsd:element>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/artemis-tools/src/test/resources/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index e538ff0..2676e19 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -576,6 +576,7 @@
                <xsd:restriction base="xsd:string">
                   <xsd:enumeration value="ASYNCIO"/>
                   <xsd:enumeration value="NIO"/>
+                  <xsd:enumeration value="MAPPED"/>
                </xsd:restriction>
             </xsd:simpleType>
          </xsd:element>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOJournalImplTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOJournalImplTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOJournalImplTest.java
index a220ab6..b0d19b3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOJournalImplTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOJournalImplTest.java
@@ -59,7 +59,8 @@ public class AIOJournalImplTest extends JournalImplTestUnit {
 
       file.mkdir();
 
-      return new AIOSequentialFileFactory(getTestDirfile(), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 1000000, 10, false);
+      // forcing the alignment to be 512, as this test was hard coded around this size.
+      return new AIOSequentialFileFactory(getTestDirfile(), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 1000000, 10, false).setAlignment(512);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java
new file mode 100644
index 0000000..5d54060
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.journal;
+
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+
+public class MappedImportExportTest extends NIOImportExportTest {
+
+   @Override
+   protected SequentialFileFactory getFileFactory() throws Exception {
+      return new MappedSequentialFileFactory(getTestDirfile());
+   }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java
new file mode 100644
index 0000000..32b4b8f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.journal;
+
+import java.io.File;
+
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+
+public class MappedJournalCompactTest extends NIOJournalCompactTest {
+
+   @Override
+   protected SequentialFileFactory getFileFactory() throws Exception {
+      File file = new File(getTestDir());
+
+      ActiveMQTestBase.deleteDirectory(file);
+
+      file.mkdir();
+
+      return new MappedSequentialFileFactory(getTestDirfile());
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aacddfda/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java
new file mode 100644
index 0000000..940c8a6
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.journal;
+
+import java.io.File;
+
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
+
+public class MappedJournalImplTest extends JournalImplTestUnit {
+
+   @Override
+   protected SequentialFileFactory getFileFactory() throws Exception {
+      File file = new File(getTestDir());
+
+      deleteDirectory(file);
+
+      file.mkdir();
+
+      return new MappedSequentialFileFactory(getTestDirfile());
+   }
+
+   @Override
+   protected int getAlignment() {
+      return fileFactory.getAlignment();
+   }
+
+}


Mime
View raw message