activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-623/ARTEMIS-622 Added memory mapped impl of Sequential File + benchs. Added experimental GCFree Journal impl + benchs + Sequentially Encoded Aligned Binary Protocol.
Date Mon, 11 Jul 2016 17:38:40 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 506dbc7ff -> 43596c283


ARTEMIS-623/ARTEMIS-622 Added memory mapped impl of Sequential File + benchs.
Added experimental GCFree Journal impl + benchs + Sequentially Encoded Aligned Binary Protocol.

https://issues.apache.org/jira/browse/ARTEMIS-622
https://issues.apache.org/jira/browse/ARTEMIS-623


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/36555a10
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/36555a10
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/36555a10

Branch: refs/heads/master
Commit: 36555a10c537cfe972ba9be59fe15d75dddcf224
Parents: 506dbc7
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Sat Jun 18 05:47:41 2016 +0200
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Jul 11 13:36:47 2016 -0400

----------------------------------------------------------------------
 .../artemis/core/io/mapped/BytesUtils.java      |  49 +++
 .../core/io/mapped/MappedByteBufferCache.java   | 239 ++++++++++
 .../artemis/core/io/mapped/MappedFile.java      | 331 ++++++++++++++
 .../core/io/mapped/MappedSequentialFile.java    | 434 +++++++++++++++++++
 .../io/mapped/MappedSequentialFileFactory.java  | 204 +++++++++
 tests/extra-tests/pom.xml                       |  31 ++
 .../journal/JournalImplLatencyBench.java        | 155 +++++++
 .../journal/gcfree/EncodersBench.java           | 113 +++++
 .../journal/gcfree/GcFreeJournal.java           |  81 ++++
 .../gcfree/GcFreeJournalLatencyBench.java       | 134 ++++++
 .../journal/gcfree/JournalAddRecordEncoder.java | 105 +++++
 .../journal/gcfree/JournalRecordHeader.java     |  27 ++
 .../journal/gcfree/JournalRecordTypes.java      |  29 ++
 .../SequentialFileLatencyBench.java             | 131 ++++++
 14 files changed, 2063 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/36555a10/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java
new file mode 100644
index 0000000..3ff723f
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.mapped;
+
+import java.nio.ByteBuffer;
+
+import io.netty.util.internal.PlatformDependent;
+
+final class BytesUtils {
+
+   private BytesUtils() {
+   }
+
+   public static long align(final long value, final long alignment) {
+      return (value + (alignment - 1)) & ~(alignment - 1);
+   }
+
+   public static void zerosDirect(final ByteBuffer buffer) {
+      //TODO When PlatformDependent will be replaced by VarHandle or Unsafe, replace with safepoint-fixed setMemory
+      //DANGEROUS!! erases bound-checking using directly addresses -> safe only if it use counted loops
+      int remaining = buffer.capacity();
+      long address = PlatformDependent.directBufferAddress(buffer);
+      while (remaining >= 8) {
+         PlatformDependent.putLong(address, 0L);
+         address += 8;
+         remaining -= 8;
+      }
+      while (remaining > 0) {
+         PlatformDependent.putByte(address, (byte) 0);
+         address++;
+         remaining--;
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/36555a10/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedByteBufferCache.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedByteBufferCache.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedByteBufferCache.java
new file mode 100644
index 0000000..dee951d
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedByteBufferCache.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.mapped;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.ref.WeakReference;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.util.ArrayList;
+
+import io.netty.util.internal.PlatformDependent;
+
+final class MappedByteBufferCache implements AutoCloseable {
+
+   public static final int PAGE_SIZE = Integer.parseInt(System.getProperty("os_page_size", "4096"));
+   private static final Object FILE_LOCK = new Object();
+   private final RandomAccessFile raf;
+   private final FileChannel fileChannel;
+   private final long chunkBytes;
+   private final long overlapBytes;
+   private final ArrayList<WeakReference<MappedByteBuffer>> byteBuffers;
+   private final File file;
+   private final long mappedSize;
+   private boolean closed;
+
+   private MappedByteBufferCache(File file, RandomAccessFile raf, long chunkBytes, long overlapBytes, long alignment) {
+      this.byteBuffers = new ArrayList<>();
+      this.file = file;
+      this.raf = raf;
+      this.fileChannel = raf.getChannel();
+      this.chunkBytes = BytesUtils.align(chunkBytes, alignment);
+      this.overlapBytes = BytesUtils.align(overlapBytes, alignment);
+      this.closed = false;
+      this.mappedSize = this.chunkBytes + this.overlapBytes;
+   }
+
+   public static MappedByteBufferCache of(File file, long chunkSize, long overlapSize) throws FileNotFoundException {
+      final RandomAccessFile raf = new RandomAccessFile(file, "rw");
+      return new MappedByteBufferCache(file, raf, chunkSize, overlapSize, PAGE_SIZE);
+   }
+
+   public static boolean inside(long position, long mappedPosition, long mappedLimit) {
+      return mappedPosition <= position && position < mappedLimit;
+   }
+
+   public File file() {
+      return file;
+   }
+
+   public long chunkBytes() {
+      return chunkBytes;
+   }
+
+   public long overlapBytes() {
+      return overlapBytes;
+   }
+
+   public int indexFor(long position) {
+      final int chunk = (int) (position / chunkBytes);
+      return chunk;
+   }
+
+   public long mappedPositionFor(int index) {
+      return index * chunkBytes;
+   }
+
+   public long mappedLimitFor(long mappedPosition) {
+      return mappedPosition + chunkBytes;
+   }
+
+   public MappedByteBuffer acquireMappedByteBuffer(final int index) throws IOException, IllegalArgumentException, IllegalStateException {
+      if (closed)
+         throw new IOException("Closed");
+      if (index < 0)
+         throw new IOException("Attempt to access a negative index: " + index);
+      while (byteBuffers.size() <= index) {
+         byteBuffers.add(null);
+      }
+      final WeakReference<MappedByteBuffer> mbbRef = byteBuffers.get(index);
+      if (mbbRef != null) {
+         final MappedByteBuffer mbb = mbbRef.get();
+         if (mbb != null) {
+            return mbb;
+         }
+      }
+      return mapAndAcquire(index);
+   }
+
+   //METHOD BUILT TO SEPARATE THE SLOW PATH TO ENSURE INLINING OF THE MOST OCCURRING CASE
+   private MappedByteBuffer mapAndAcquire(final int index) throws IOException {
+      final long chunkStartPosition = mappedPositionFor(index);
+      final long minSize = chunkStartPosition + mappedSize;
+      if (fileChannel.size() < minSize) {
+         try {
+            synchronized (FILE_LOCK) {
+               try (FileLock lock = fileChannel.lock()) {
+                  final long size = fileChannel.size();
+                  if (size < minSize) {
+                     raf.setLength(minSize);
+                  }
+               }
+            }
+         }
+         catch (IOException ioe) {
+            throw new IOException("Failed to resize to " + minSize, ioe);
+         }
+      }
+
+      final MappedByteBuffer mbb = fileChannel.map(FileChannel.MapMode.READ_WRITE, chunkStartPosition, mappedSize);
+      mbb.order(ByteOrder.nativeOrder());
+      byteBuffers.set(index, new WeakReference<>(mbb));
+      return mbb;
+   }
+
+   public long fileSize() throws IOException {
+      if (closed)
+         throw new IllegalStateException("Closed");
+      return fileChannel.size();
+   }
+
+   public void closeAndResize(long length) {
+      if (!closed) {
+         //TO_FIX: unmap in this way is not portable BUT required on Windows that can't resize a memmory mapped file!
+         final int mappedBuffers = this.byteBuffers.size();
+         for (int i = 0; i < mappedBuffers; i++) {
+            final WeakReference<MappedByteBuffer> mbbRef = byteBuffers.get(i);
+            if (mbbRef != null) {
+               final MappedByteBuffer mbb = mbbRef.get();
+               if (mbb != null) {
+                  try {
+                     PlatformDependent.freeDirectBuffer(mbb);
+                  }
+                  catch (Throwable t) {
+                     //TO_FIX: force releasing of the other buffers
+                  }
+               }
+            }
+         }
+         this.byteBuffers.clear();
+         try {
+            if (fileChannel.size() != length) {
+               try {
+                  synchronized (FILE_LOCK) {
+                     try (FileLock lock = fileChannel.lock()) {
+                        final long size = fileChannel.size();
+                        if (size != length) {
+                           raf.setLength(length);
+                        }
+                     }
+                  }
+               }
+               catch (IOException ioe) {
+                  throw new IllegalStateException("Failed to resize to " + length, ioe);
+               }
+            }
+         }
+         catch (IOException ex) {
+            throw new IllegalStateException("Failed to get size", ex);
+         }
+         finally {
+            try {
+               fileChannel.close();
+            }
+            catch (IOException e) {
+               throw new IllegalStateException("Failed to close channel", e);
+            }
+            finally {
+               try {
+                  raf.close();
+               }
+               catch (IOException e) {
+                  throw new IllegalStateException("Failed to close RandomAccessFile", e);
+               }
+            }
+            closed = true;
+         }
+      }
+   }
+
+   public boolean isClosed() {
+      return closed;
+   }
+
+   public void close() {
+      if (!closed) {
+         //TO_FIX: unmap in this way is not portable BUT required on Windows that can't resize a memory mapped file!
+         final int mappedBuffers = this.byteBuffers.size();
+         for (int i = 0; i < mappedBuffers; i++) {
+            final WeakReference<MappedByteBuffer> mbbRef = byteBuffers.get(i);
+            if (mbbRef != null) {
+               final MappedByteBuffer mbb = mbbRef.get();
+               if (mbb != null) {
+                  try {
+                     PlatformDependent.freeDirectBuffer(mbb);
+                  }
+                  catch (Throwable t) {
+                     //TO_FIX: force releasing of the other buffers
+                  }
+               }
+            }
+         }
+         this.byteBuffers.clear();
+         try {
+            fileChannel.close();
+         }
+         catch (IOException e) {
+            throw new IllegalStateException("Failed to close channel", e);
+         }
+         finally {
+            try {
+               raf.close();
+            }
+            catch (IOException e) {
+               throw new IllegalStateException("Failed to close RandomAccessFile", e);
+            }
+         }
+         closed = true;
+      }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/36555a10/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
new file mode 100644
index 0000000..72221f4
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.mapped;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.PlatformDependent;
+
+final class MappedFile implements AutoCloseable {
+
+   private static final ByteBuffer ZERO_PAGE = ByteBuffer.allocateDirect(MappedByteBufferCache.PAGE_SIZE).order(ByteOrder.nativeOrder());
+
+   private final MappedByteBufferCache cache;
+   private final int zerosMaxPage;
+   private MappedByteBuffer lastMapped;
+   private long lastMappedStart;
+   private long lastMappedLimit;
+   private long position;
+   private long length;
+
+   private MappedFile(MappedByteBufferCache cache) throws IOException {
+      this.cache = cache;
+      this.lastMapped = null;
+      this.lastMappedStart = -1;
+      this.lastMappedLimit = -1;
+      this.position = 0;
+      this.length = this.cache.fileSize();
+      this.zerosMaxPage = Math.min(ZERO_PAGE.capacity(), (int) Math.min(Integer.MAX_VALUE, cache.overlapBytes()));
+   }
+
+   public static MappedFile of(File file, long chunckSize, long overlapSize) throws IOException {
+      return new MappedFile(MappedByteBufferCache.of(file, chunckSize, overlapSize));
+   }
+
+   public MappedByteBufferCache cache() {
+      return cache;
+   }
+
+   private int checkOffset(long offset, int bytes) throws BufferUnderflowException, IOException {
+      if (!MappedByteBufferCache.inside(offset, lastMappedStart, lastMappedLimit)) {
+         try {
+            final int index = cache.indexFor(offset);
+            final long mappedPosition = cache.mappedPositionFor(index);
+            final long mappedLimit = cache.mappedLimitFor(mappedPosition);
+            if (offset + bytes > mappedLimit) {
+               throw new IOException("mapping overflow!");
+            }
+            lastMapped = cache.acquireMappedByteBuffer(index);
+            lastMappedStart = mappedPosition;
+            lastMappedLimit = mappedLimit;
+            final int bufferPosition = (int) (offset - mappedPosition);
+            return bufferPosition;
+         }
+         catch (IllegalStateException e) {
+            throw new IOException(e);
+         }
+         catch (IllegalArgumentException e) {
+            throw new BufferUnderflowException();
+         }
+      }
+      else {
+         final int bufferPosition = (int) (offset - lastMappedStart);
+         return bufferPosition;
+      }
+   }
+
+   public void force() {
+      if (lastMapped != null) {
+         lastMapped.force();
+      }
+   }
+
+   /**
+    * Reads a sequence of bytes from this file into the given buffer.
+    * <p>
+    * <p> Bytes are read starting at this file's specified position.
+    */
+   public int read(long position, ByteBuf dst, int dstStart, int dstLength) throws IOException {
+      final int bufferPosition = checkOffset(position, dstLength);
+      final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+      if (dst.hasMemoryAddress()) {
+         final long dstAddress = dst.memoryAddress() + dstStart;
+         PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength);
+      }
+      else if (dst.hasArray()) {
+         final byte[] dstArray = dst.array();
+         PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, dstLength);
+      }
+      else {
+         throw new IllegalArgumentException("unsupported byte buffer");
+      }
+      position += dstLength;
+      if (position > this.length) {
+         this.length = position;
+      }
+      return dstLength;
+   }
+
+   /**
+    * Reads a sequence of bytes from this file into the given buffer.
+    * <p>
+    * <p> Bytes are read starting at this file's specified position.
+    */
+   public int read(long position, ByteBuffer dst, int dstStart, int dstLength) throws IOException {
+      final int bufferPosition = checkOffset(position, dstLength);
+      final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+      if (dst.isDirect()) {
+         final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart;
+         PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength);
+      }
+      else {
+         final byte[] dstArray = dst.array();
+         PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, dstLength);
+      }
+      position += dstLength;
+      if (position > this.length) {
+         this.length = position;
+      }
+      return dstLength;
+   }
+
+   /**
+    * Reads a sequence of bytes from this file into the given buffer.
+    * <p>
+    * <p> Bytes are read starting at this file's current position, and
+    * then the position is updated with the number of bytes actually read.
+    */
+   public int read(ByteBuf dst, int dstStart, int dstLength) throws IOException {
+      final int remaining = (int) Math.min(this.length - this.position, (long) Integer.MAX_VALUE);
+      final int read = Math.min(remaining, dstLength);
+      final int bufferPosition = checkOffset(position, read);
+      final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+      if (dst.hasMemoryAddress()) {
+         final long dstAddress = dst.memoryAddress() + dstStart;
+         PlatformDependent.copyMemory(srcAddress, dstAddress, read);
+      }
+      else if (dst.hasArray()) {
+         final byte[] dstArray = dst.array();
+         PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, read);
+      }
+      else {
+         throw new IllegalArgumentException("unsupported byte buffer");
+      }
+      position += read;
+      return read;
+   }
+
+   /**
+    * Reads a sequence of bytes from this file into the given buffer.
+    * <p>
+    * <p> Bytes are read starting at this file's current position, and
+    * then the position is updated with the number of bytes actually read.
+    */
+   public int read(ByteBuffer dst, int dstStart, int dstLength) throws IOException {
+      final int remaining = (int) Math.min(this.length - this.position, (long) Integer.MAX_VALUE);
+      final int read = Math.min(remaining, dstLength);
+      final int bufferPosition = checkOffset(position, read);
+      final long srcAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+      if (dst.isDirect()) {
+         final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart;
+         PlatformDependent.copyMemory(srcAddress, dstAddress, read);
+      }
+      else {
+         final byte[] dstArray = dst.array();
+         PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, read);
+      }
+      position += read;
+      return read;
+   }
+
+   /**
+    * Writes a sequence of bytes to this file from the given buffer.
+    * <p>
+    * <p> Bytes are written starting at this file's current position,
+    */
+   public void write(ByteBuf src, int srcStart, int srcLength) throws IOException {
+      final int bufferPosition = checkOffset(position, srcLength);
+      final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+      if (src.hasMemoryAddress()) {
+         final long srcAddress = src.memoryAddress() + srcStart;
+         PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
+      }
+      else if (src.hasArray()) {
+         final byte[] srcArray = src.array();
+         PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength);
+      }
+      else {
+         throw new IllegalArgumentException("unsupported byte buffer");
+      }
+      position += srcLength;
+      if (position > this.length) {
+         this.length = position;
+      }
+   }
+
+   /**
+    * Writes a sequence of bytes to this file from the given buffer.
+    * <p>
+    * <p> Bytes are written starting at this file's current position,
+    */
+   public void write(ByteBuffer src, int srcStart, int srcLength) throws IOException {
+      final int bufferPosition = checkOffset(position, srcLength);
+      final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+      if (src.isDirect()) {
+         final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart;
+         PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
+      }
+      else {
+         final byte[] srcArray = src.array();
+         PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength);
+      }
+      position += srcLength;
+      if (position > this.length) {
+         this.length = position;
+      }
+   }
+
+   /**
+    * Writes a sequence of bytes to this file from the given buffer.
+    * <p>
+    * <p> Bytes are written starting at this file's specified position,
+    */
+   public void write(long position, ByteBuf src, int srcStart, int srcLength) throws IOException {
+      final int bufferPosition = checkOffset(position, srcLength);
+      final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+      if (src.hasMemoryAddress()) {
+         final long srcAddress = src.memoryAddress() + srcStart;
+         PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
+      }
+      else if (src.hasArray()) {
+         final byte[] srcArray = src.array();
+         PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength);
+      }
+      else {
+         throw new IllegalArgumentException("unsupported byte buffer");
+      }
+      position += srcLength;
+      if (position > this.length) {
+         this.length = position;
+      }
+   }
+
+   /**
+    * Writes a sequence of bytes to this file from the given buffer.
+    * <p>
+    * <p> Bytes are written starting at this file's specified position,
+    */
+   public void write(long position, ByteBuffer src, int srcStart, int srcLength) throws IOException {
+      final int bufferPosition = checkOffset(position, srcLength);
+      final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+      if (src.isDirect()) {
+         final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart;
+         PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
+      }
+      else {
+         final byte[] srcArray = src.array();
+         PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength);
+      }
+      position += srcLength;
+      if (position > this.length) {
+         this.length = position;
+      }
+   }
+
+   /**
+    * Writes a sequence of bytes to this file from the given buffer.
+    * <p>
+    * <p> Bytes are written starting at this file's current position,
+    */
+   public void zeros(long offset, int count) throws IOException {
+      final long targetOffset = offset + count;
+      final int zerosBulkCopies = count / zerosMaxPage;
+      final long srcAddress = PlatformDependent.directBufferAddress(ZERO_PAGE);
+      for (int i = 0; i < zerosBulkCopies; i++) {
+         final int bufferPosition = checkOffset(offset, zerosMaxPage);
+         final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+         PlatformDependent.copyMemory(srcAddress, destAddress, zerosMaxPage);
+         offset += zerosMaxPage;
+      }
+      final int remainingToBeZeroes = (int) (targetOffset - offset);
+      final int bufferPosition = checkOffset(offset, remainingToBeZeroes);
+      final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
+      PlatformDependent.copyMemory(srcAddress, destAddress, remainingToBeZeroes);
+      if (targetOffset > this.length) {
+         this.length = targetOffset;
+      }
+   }
+
+   public long position() {
+      return position;
+   }
+
+   public long position(long newPosition) {
+      final long oldPosition = this.position;
+      this.position = newPosition;
+      return oldPosition;
+   }
+
+   public long length() {
+      return length;
+   }
+
+   @Override
+   public void close() {
+      cache.close();
+   }
+
+   public void closeAndResize(long length) {
+      cache.closeAndResize(length);
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/36555a10/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
new file mode 100644
index 0000000..c890ff4
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.mapped;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+
+final class MappedSequentialFile implements SequentialFile {
+
+   private final File directory;
+   private final long chunkBytes;
+   private final long overlapBytes;
+   private final IOCriticalErrorListener criticalErrorListener;
+   private File file;
+   private File absoluteFile;
+   private String fileName;
+   private MappedFile mappedFile;
+   private ActiveMQBuffer pooledActiveMQBuffer;
+
+   MappedSequentialFile(final File directory,
+                        final File file,
+                        final long chunkBytes,
+                        final long overlapBytes,
+                        final IOCriticalErrorListener criticalErrorListener) {
+      this.directory = directory;
+      this.file = file;
+      this.absoluteFile = null;
+      this.fileName = null;
+      this.chunkBytes = chunkBytes;
+      this.overlapBytes = overlapBytes;
+      this.mappedFile = null;
+      this.pooledActiveMQBuffer = null;
+      this.criticalErrorListener = criticalErrorListener;
+   }
+
+   private void checkIsOpen() {
+      if (!isOpen()) {
+         throw new IllegalStateException("must be open!");
+      }
+   }
+
+   private void checkIsNotOpen() {
+      if (isOpen()) {
+         throw new IllegalStateException("must be closed!");
+      }
+   }
+
+   @Override
+   public boolean isOpen() {
+      return this.mappedFile != null;
+   }
+
+   @Override
+   public boolean exists() {
+      return this.file.exists();
+   }
+
+   @Override
+   public void open() throws IOException {
+      if (this.mappedFile == null) {
+         this.mappedFile = MappedFile.of(file, chunkBytes, overlapBytes);
+      }
+   }
+
+   @Override
+   public void open(int maxIO, boolean useExecutor) throws IOException {
+      //ignore maxIO e useExecutor
+      ActiveMQJournalLogger.LOGGER.warn("ignoring maxIO and useExecutor unsupported parameters!");
+      this.open();
+   }
+
+   @Override
+   public boolean fits(int size) {
+      checkIsOpen();
+      final long newPosition = this.mappedFile.position() + size;
+      final boolean hasRemaining = newPosition <= this.mappedFile.length();
+      return hasRemaining;
+   }
+
+   @Override
+   public int getAlignment() {
+      return 0;
+   }
+
+   @Override
+   public int calculateBlockStart(int position) {
+      return position;
+   }
+
+   @Override
+   public String getFileName() {
+      if (this.fileName == null) {
+         this.fileName = this.file.getName();
+      }
+      return this.fileName;
+   }
+
+   @Override
+   public void fill(int size) throws IOException {
+      checkIsOpen();
+      this.mappedFile.zeros(this.mappedFile.position(), size);
+   }
+
+   @Override
+   public void delete() {
+      checkIsNotOpen();
+      if (file.exists() && !file.delete()) {
+         ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
+      }
+   }
+
+   @Override
+   public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws IOException {
+      checkIsOpen();
+      if (callback == null) {
+         throw new NullPointerException("callback parameter need to be set");
+      }
+      try {
+         final ByteBuf byteBuf = bytes.byteBuf();
+         final int writerIndex = byteBuf.writerIndex();
+         final int readerIndex = byteBuf.readerIndex();
+         final int readableBytes = writerIndex - readerIndex;
+         if (readableBytes > 0) {
+            this.mappedFile.write(byteBuf, readerIndex, readableBytes);
+            if (sync) {
+               this.mappedFile.force();
+            }
+         }
+         callback.done();
+      }
+      catch (IOException e) {
+         if (this.criticalErrorListener != null) {
+            this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         }
+         callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
+         throw e;
+      }
+   }
+
+   @Override
+   public void write(ActiveMQBuffer bytes, boolean sync) throws IOException {
+      checkIsOpen();
+      final ByteBuf byteBuf = bytes.byteBuf();
+      final int writerIndex = byteBuf.writerIndex();
+      final int readerIndex = byteBuf.readerIndex();
+      final int readableBytes = writerIndex - readerIndex;
+      if (readableBytes > 0) {
+         this.mappedFile.write(byteBuf, readerIndex, readableBytes);
+         if (sync) {
+            this.mappedFile.force();
+         }
+      }
+   }
+
+   private ActiveMQBuffer acquiresActiveMQBufferWithAtLeast(int size) {
+      if (this.pooledActiveMQBuffer == null || this.pooledActiveMQBuffer.capacity() < size) {
+         this.pooledActiveMQBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(size, size).order(ByteOrder.nativeOrder()));
+      }
+      else {
+         this.pooledActiveMQBuffer.clear();
+      }
+      return pooledActiveMQBuffer;
+   }
+
+   @Override
+   public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws IOException {
+      checkIsOpen();
+      if (callback == null) {
+         throw new NullPointerException("callback parameter need to be set");
+      }
+      try {
+         final int encodedSize = bytes.getEncodeSize();
+         final ActiveMQBuffer outBuffer = acquiresActiveMQBufferWithAtLeast(encodedSize);
+         bytes.encode(outBuffer);
+         final ByteBuf byteBuf = outBuffer.byteBuf();
+         final int writerIndex = byteBuf.writerIndex();
+         final int readerIndex = byteBuf.readerIndex();
+         final int readableBytes = writerIndex - readerIndex;
+         if (readableBytes > 0) {
+            this.mappedFile.write(byteBuf, readerIndex, readableBytes);
+            if (sync) {
+               this.mappedFile.force();
+            }
+         }
+         callback.done();
+      }
+      catch (IOException e) {
+         if (this.criticalErrorListener != null) {
+            this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         }
+         callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
+         throw e;
+      }
+   }
+
+   @Override
+   public void write(EncodingSupport bytes, boolean sync) throws IOException {
+      checkIsOpen();
+      final int encodedSize = bytes.getEncodeSize();
+      final ActiveMQBuffer outBuffer = acquiresActiveMQBufferWithAtLeast(encodedSize);
+      bytes.encode(outBuffer);
+      final ByteBuf byteBuf = outBuffer.byteBuf();
+      final int writerIndex = byteBuf.writerIndex();
+      final int readerIndex = byteBuf.readerIndex();
+      final int readableBytes = writerIndex - readerIndex;
+      if (readableBytes > 0) {
+         this.mappedFile.write(byteBuf, readerIndex, readableBytes);
+         if (sync) {
+            this.mappedFile.force();
+         }
+      }
+   }
+
+   @Override
+   public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
+      checkIsOpen();
+      if (callback == null) {
+         throw new NullPointerException("callback parameter need to be set");
+      }
+      try {
+         final int position = bytes.position();
+         final int limit = bytes.limit();
+         final int remaining = limit - position;
+         if (remaining > 0) {
+            this.mappedFile.write(bytes, position, remaining);
+            if (sync) {
+               this.mappedFile.force();
+            }
+         }
+         callback.done();
+      }
+      catch (IOException e) {
+         if (this.criticalErrorListener != null) {
+            this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         }
+         callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
+         throw new RuntimeException(e);
+      }
+   }
+
+   @Override
+   public void writeDirect(ByteBuffer bytes, boolean sync) throws IOException {
+      checkIsOpen();
+      final int position = bytes.position();
+      final int limit = bytes.limit();
+      final int remaining = limit - position;
+      if (remaining > 0) {
+         this.mappedFile.write(bytes, position, remaining);
+         if (sync) {
+            this.mappedFile.force();
+         }
+      }
+   }
+
+   @Override
+   public int read(ByteBuffer bytes, IOCallback callback) throws IOException {
+      checkIsOpen();
+      if (callback == null) {
+         throw new NullPointerException("callback parameter need to be set");
+      }
+      try {
+         final int position = bytes.position();
+         final int limit = bytes.limit();
+         final int remaining = limit - position;
+         if (remaining > 0) {
+            final int bytesRead = this.mappedFile.read(bytes, position, remaining);
+            final int newPosition = position + bytesRead;
+            bytes.position(newPosition);
+            bytes.flip();
+            callback.done();
+            return bytesRead;
+         }
+         return 0;
+      }
+      catch (IOException e) {
+         if (this.criticalErrorListener != null) {
+            this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         }
+         callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
+         throw e;
+      }
+   }
+
+   @Override
+   public int read(ByteBuffer bytes) throws IOException {
+      checkIsOpen();
+      final int position = bytes.position();
+      final int limit = bytes.limit();
+      final int remaining = limit - position;
+      if (remaining > 0) {
+         final int bytesRead = this.mappedFile.read(bytes, position, remaining);
+         final int newPosition = position + bytesRead;
+         bytes.position(newPosition);
+         bytes.flip();
+         return bytesRead;
+      }
+      return 0;
+   }
+
+   @Override
+   public void position(long pos) {
+      checkIsOpen();
+      this.mappedFile.position(pos);
+   }
+
+   @Override
+   public long position() {
+      checkIsOpen();
+      return this.mappedFile.position();
+   }
+
+   @Override
+   public void close() {
+      if (this.mappedFile != null) {
+         this.mappedFile.closeAndResize(this.mappedFile.length());
+         this.mappedFile = null;
+      }
+   }
+
+   @Override
+   public void sync() throws IOException {
+      checkIsOpen();
+      this.mappedFile.force();
+   }
+
+   @Override
+   public long size() {
+      if (this.mappedFile != null) {
+         return this.mappedFile.length();
+      }
+      else {
+         return this.file.length();
+      }
+   }
+
+   @Override
+   public void renameTo(String newFileName) throws Exception {
+      checkIsNotOpen();
+      if (this.fileName == null) {
+         this.fileName = this.file.getName();
+      }
+      if (!this.fileName.contentEquals(newFileName)) {
+         final File newFile = new File(this.directory, newFileName);
+         if (!file.renameTo(newFile)) {
+            throw ActiveMQJournalBundle.BUNDLE.ioRenameFileError(file.getName(), newFileName);
+         }
+         else {
+            this.file = newFile;
+            this.fileName = newFileName;
+            this.absoluteFile = null;
+         }
+      }
+   }
+
+   @Override
+   public SequentialFile cloneFile() {
+      checkIsNotOpen();
+      return new MappedSequentialFile(this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener);
+   }
+
+   @Override
+   public void copyTo(SequentialFile dstFile) throws IOException {
+      checkIsNotOpen();
+      if (dstFile.isOpen()) {
+         throw new IllegalArgumentException("dstFile must be closed too");
+      }
+      try (RandomAccessFile src = new RandomAccessFile(file, "rw");
+           FileChannel srcChannel = src.getChannel();
+           FileLock srcLock = srcChannel.lock()) {
+         final long readableBytes = srcChannel.size();
+         if (readableBytes > 0) {
+            try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw");
+                 FileChannel dstChannel = dst.getChannel();
+                 FileLock dstLock = dstChannel.lock()) {
+               final long oldLength = dst.length();
+               final long newLength = oldLength + readableBytes;
+               dst.setLength(newLength);
+               final long transferred = dstChannel.transferFrom(srcChannel, oldLength, readableBytes);
+               if (transferred != readableBytes) {
+                  dstChannel.truncate(oldLength);
+                  throw new IOException("copied less then expected");
+               }
+            }
+         }
+      }
+   }
+
+   @Override
+   @Deprecated
+   public void setTimedBuffer(TimedBuffer buffer) {
+      throw new UnsupportedOperationException("the timed buffer is not currently supported");
+   }
+
+   @Override
+   public File getJavaFile() {
+      if (this.absoluteFile == null) {
+         this.absoluteFile = this.file.getAbsoluteFile();
+      }
+      return this.absoluteFile;
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/36555a10/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
new file mode 100644
index 0000000..4dc3206
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io.mapped;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import io.netty.util.internal.PlatformDependent;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+
+public final class MappedSequentialFileFactory implements SequentialFileFactory {
+
+   private static long DEFAULT_BLOCK_SIZE = 64L << 20;
+   private final File directory;
+   private final IOCriticalErrorListener criticalErrorListener;
+   private long chunkBytes;
+   private long overlapBytes;
+
+   public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
+      this.directory = directory;
+      this.criticalErrorListener = criticalErrorListener;
+      this.chunkBytes = DEFAULT_BLOCK_SIZE;
+      this.overlapBytes = DEFAULT_BLOCK_SIZE / 4;
+   }
+
+   public MappedSequentialFileFactory(File directory) {
+      this.directory = directory;
+      this.criticalErrorListener = null;
+      this.chunkBytes = DEFAULT_BLOCK_SIZE;
+      this.overlapBytes = DEFAULT_BLOCK_SIZE / 4;
+   }
+
+   public long chunkBytes() {
+      return chunkBytes;
+   }
+
+   public MappedSequentialFileFactory chunkBytes(long chunkBytes) {
+      this.chunkBytes = chunkBytes;
+      return this;
+   }
+
+   public long overlapBytes() {
+      return overlapBytes;
+   }
+
+   public MappedSequentialFileFactory overlapBytes(long overlapBytes) {
+      this.overlapBytes = overlapBytes;
+      return this;
+   }
+
+   @Override
+   public SequentialFile createSequentialFile(String fileName) {
+      return new MappedSequentialFile(directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
+   }
+
+   @Override
+   public int getMaxIO() {
+      return 0;
+   }
+
+   @Override
+   public List<String> listFiles(final String extension) throws Exception {
+      final FilenameFilter extensionFilter = new FilenameFilter() {
+         @Override
+         public boolean accept(final File file, final String name) {
+            return name.endsWith("." + extension);
+         }
+      };
+      final String[] fileNames = directory.list(extensionFilter);
+      if (fileNames == null) {
+         return Collections.EMPTY_LIST;
+      }
+      return Arrays.asList(fileNames);
+   }
+
+   @Override
+   public boolean isSupportsCallbacks() {
+      return false;
+   }
+
+   @Override
+   public void onIOError(Exception exception, String message, SequentialFile file) {
+      if (criticalErrorListener != null) {
+         criticalErrorListener.onIOException(exception, message, file);
+      }
+   }
+
+   @Override
+   public ByteBuffer allocateDirectBuffer(final int size) {
+      return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder());
+   }
+
+   @Override
+   public void releaseDirectBuffer(final ByteBuffer buffer) {
+      PlatformDependent.freeDirectBuffer(buffer);
+   }
+
+   @Override
+   public ByteBuffer newBuffer(final int size) {
+      return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder());
+   }
+
+   @Override
+   public void releaseBuffer(ByteBuffer buffer) {
+      if (buffer.isDirect()) {
+         PlatformDependent.freeDirectBuffer(buffer);
+      }
+   }
+
+   @Override
+   public void activateBuffer(SequentialFile file) {
+
+   }
+
+   @Override
+   public void deactivateBuffer() {
+
+   }
+
+   @Override
+   public ByteBuffer wrapBuffer(final byte[] bytes) {
+      return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+   }
+
+   @Override
+   public int getAlignment() {
+      return 1;
+   }
+
+   @Override
+   public int calculateBlockSize(int bytes) {
+      return bytes;
+   }
+
+   @Override
+   public File getDirectory() {
+      return this.directory;
+   }
+
+   @Override
+   public void clearBuffer(final ByteBuffer buffer) {
+      buffer.clear();
+      if (buffer.isDirect()) {
+         BytesUtils.zerosDirect(buffer);
+      }
+      else if (buffer.hasArray()) {
+         final byte[] array = buffer.array();
+         //SIMD OPTIMIZATION
+         Arrays.fill(array, (byte) 0);
+      }
+      else {
+         //TODO VERIFY IF IT COULD HAPPENS
+         final int capacity = buffer.capacity();
+         for (int i = 0; i < capacity; i++) {
+            buffer.put(i, (byte) 0);
+         }
+      }
+   }
+
+   @Override
+   public void start() {
+
+   }
+
+   @Override
+   public void stop() {
+
+   }
+
+   @Override
+   public void createDirs() throws Exception {
+      boolean ok = directory.mkdirs();
+      if (!ok) {
+         throw new IOException("Failed to create directory " + directory);
+      }
+   }
+
+   @Override
+   public void flush() {
+
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/36555a10/tests/extra-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/extra-tests/pom.xml b/tests/extra-tests/pom.xml
index 1da200a..bd00901 100644
--- a/tests/extra-tests/pom.xml
+++ b/tests/extra-tests/pom.xml
@@ -38,6 +38,9 @@
       <activemq.basedir>${project.basedir}/../..</activemq.basedir>
       <jboss-jts.version>4.17.13.Final</jboss-jts.version>
       <hornetq.version>2.4.7.Final</hornetq.version>
+      <openhft.core.version>1.4.9</openhft.core.version>
+      <openhft.affinity.version>3.0.6</openhft.affinity.version>
+      <openjdk.jmh.version>1.12</openjdk.jmh.version>
    </properties>
 
    <dependencies>
@@ -213,6 +216,34 @@
          <artifactId>jbossjts-jacorb</artifactId>
          <version>4.17.13.Final</version>
       </dependency>
+
+      <!-- ### Benchmark Tools -->
+      <!-- ### Java Latency Benchmarking Harness -->
+      <dependency>
+         <groupId>net.openhft</groupId>
+         <artifactId>chronicle-core</artifactId>
+         <version>${openhft.core.version}</version>
+         <!-- License: Apache 2.0 -->
+      </dependency>
+      <dependency>
+         <groupId>net.openhft</groupId>
+         <artifactId>affinity</artifactId>
+         <version>${openhft.affinity.version}</version>
+         <!-- License: LGPLv3-->
+      </dependency>
+      <!-- ### Java Microbenchmark Harness -->
+      <dependency>
+         <groupId>org.openjdk.jmh</groupId>
+         <artifactId>jmh-core</artifactId>
+         <version>${openjdk.jmh.version}</version>
+         <!-- License: GPLv2-->
+      </dependency>
+      <dependency>
+         <groupId>org.openjdk.jmh</groupId>
+         <artifactId>jmh-generator-annprocess</artifactId>
+         <version>${openjdk.jmh.version}</version>
+         <!-- License: GPLv2-->
+      </dependency>
    </dependencies>
 
    <build>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/36555a10/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplLatencyBench.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplLatencyBench.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplLatencyBench.java
new file mode 100644
index 0000000..7f13639
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/JournalImplLatencyBench.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.benchmarks.journal;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+
+import net.openhft.chronicle.core.jlbh.JLBH;
+import net.openhft.chronicle.core.jlbh.JLBHOptions;
+import net.openhft.chronicle.core.jlbh.JLBHTask;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+
+public class JournalImplLatencyBench implements JLBHTask {
+
+   private static final int FILE_SIZE = 1024 * 1024 * 1024;
+   private static final JournalType JOURNAL_TYPE = JournalType.MAPPED;
+   private static final int ITERATIONS = 100_000;
+   private static final int WARMUP_ITERATIONS = 20_000;
+   private static final int TARGET_THROUGHPUT = 50_000;
+   private static final int TESTS = 5;
+   private static int TOTAL_MESSAGES = (ITERATIONS * TESTS + WARMUP_ITERATIONS);
+   private static int ENCODED_SIZE = 8;
+   private static int CHUNK_BYTES = FILE_SIZE;
+   private static int OVERLAP_BYTES = CHUNK_BYTES / 4;
+   private final SequentialFileFactory sequentialFileFactory;
+   private Journal journal;
+   private EncodingSupport encodingSupport;
+   private JLBH jlbh;
+   private long id;
+   public JournalImplLatencyBench(SequentialFileFactory sequentialFileFactory) {
+      this.sequentialFileFactory = sequentialFileFactory;
+   }
+
+   public static void main(String[] args) throws IOException {
+      final File journalDir = Files.createTempDirectory("seq_files").toFile();
+      journalDir.deleteOnExit();
+      final boolean buffered = false;
+      final int bufferSize = 4096;
+      final int bufferTimeout = 0;
+      final int maxIO = -1;
+      final boolean logRates = false;
+      final IOCriticalErrorListener criticalErrorListener = null;
+      final SequentialFileFactory sequentialFileFactory;
+      switch (JOURNAL_TYPE) {
+         case MAPPED:
+            sequentialFileFactory = new MappedSequentialFileFactory(journalDir, criticalErrorListener).chunkBytes(CHUNK_BYTES).overlapBytes(OVERLAP_BYTES);
+            break;
+         case NIO:
+            sequentialFileFactory = new NIOSequentialFileFactory(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener);
+            break;
+
+         default:
+            throw new AssertionError("!?");
+      }
+      final JLBHOptions lth = new JLBHOptions().warmUpIterations(WARMUP_ITERATIONS).iterations(ITERATIONS).throughput(TARGET_THROUGHPUT).runs(TESTS).recordOSJitter(true).accountForCoordinatedOmmission(true).jlbhTask(new JournalImplLatencyBench(sequentialFileFactory));
+      new JLBH(lth).start();
+   }
+
+   @Override
+   public void init(JLBH jlbh) {
+      id = 0;
+      this.jlbh = jlbh;
+      int numFiles = (int) ((TOTAL_MESSAGES * 1024 + 512) / FILE_SIZE * 1.3);
+      if (numFiles < 2) {
+         numFiles = 2;
+      }
+      this.journal = new JournalImpl(FILE_SIZE, numFiles, numFiles, 0, 0, sequentialFileFactory, "activemq-data", "amq", Integer.MAX_VALUE);
+      this.encodingSupport = NilEncodingSupport.Instance;
+      try {
+         journal.start();
+         journal.load(new ArrayList<RecordInfo>(), null, null);
+      }
+      catch (Exception e) {
+         throw new RuntimeException(e);
+      }
+
+   }
+
+   @Override
+   public void run(long startTimeNS) {
+      id++;
+      try {
+         journal.appendAddRecord(id, (byte) 0, encodingSupport, false);
+      }
+      catch (Exception e) {
+         throw new RuntimeException(e);
+      }
+      jlbh.sample(System.nanoTime() - startTimeNS);
+   }
+
+   @Override
+   public void complete() {
+      try {
+         journal.stop();
+         for (File journalFile : sequentialFileFactory.getDirectory().listFiles()) {
+            journalFile.deleteOnExit();
+         }
+      }
+      catch (Exception e) {
+         throw new RuntimeException(e);
+      }
+   }
+
+   private enum JournalType {
+      MAPPED,
+      NIO
+   }
+
+   private enum NilEncodingSupport implements EncodingSupport {
+      Instance;
+
+      @Override
+      public int getEncodeSize() {
+         return ENCODED_SIZE;
+      }
+
+      @Override
+      public void encode(ActiveMQBuffer buffer) {
+         final int writerIndex = buffer.writerIndex();
+         for (int i = 0; i < ENCODED_SIZE; i++) {
+            buffer.writeByte((byte) 0);
+         }
+         buffer.writerIndex(writerIndex + ENCODED_SIZE);
+      }
+
+      @Override
+      public void decode(ActiveMQBuffer buffer) {
+         throw new UnsupportedOperationException();
+      }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/36555a10/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java
new file mode 100644
index 0000000..833302d
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/EncodersBench.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
+import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.profile.GCProfiler;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@State(Scope.Thread)
+@BenchmarkMode(value = {Mode.Throughput, Mode.SampleTime})
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+public class EncodersBench {
+
+   private static final int expectedEncoderSize = JournalRecordHeader.BYTES + AddJournalRecordEncoder.expectedSize(0);
+   private JournalInternalRecord record;
+   private ByteBuffer byteBuffer;
+   private AddJournalRecordEncoder addJournalRecordEncoder;
+   private ActiveMQBuffer outBuffer;
+
+   public static void main(String[] args) throws RunnerException {
+      final Options opt = new OptionsBuilder().include(EncodersBench.class.getSimpleName()).addProfiler(GCProfiler.class).warmupIterations(5).measurementIterations(5).forks(1).build();
+      new Runner(opt).run();
+   }
+
+   @Setup
+   public void init() {
+      this.byteBuffer = ByteBuffer.allocateDirect(expectedEncoderSize);
+      this.byteBuffer.order(ByteOrder.nativeOrder());
+      this.addJournalRecordEncoder = new AddJournalRecordEncoder();
+
+      this.record = new JournalAddRecord(true, 1, (byte) 1, ZeroEncodingSupport.Instance);
+      this.record.setFileID(1);
+      this.record.setCompactCount((short) 1);
+      this.outBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(this.record.getEncodeSize(), this.record.getEncodeSize()).order(ByteOrder.nativeOrder()));
+   }
+
+   @Benchmark
+   public int encodeAligned() {
+      //Header
+      final long header = JournalRecordHeader.makeHeader(JournalRecordTypes.ADD_JOURNAL, expectedEncoderSize);
+      this.byteBuffer.putLong(0, header);
+      //FileId<CompactCount<Id<RecordType<RecordBytes
+      return addJournalRecordEncoder.on(byteBuffer, JournalRecordHeader.BYTES).fileId(1).compactCount(1).id(1L).recordType(1).noRecord().encodedLength();
+   }
+
+   @Benchmark
+   public int encodeUnaligned() {
+      outBuffer.clear();
+      record.encode(outBuffer);
+      return record.getEncodeSize();
+   }
+
+   @Benchmark
+   public int encodeUnalignedWithGarbage() {
+      outBuffer.clear();
+      final JournalAddRecord addRecord = new JournalAddRecord(true, 1, (byte) 1, ZeroEncodingSupport.Instance);
+      addRecord.setFileID(1);
+      addRecord.setCompactCount((short) 1);
+      addRecord.encode(outBuffer);
+      return addRecord.getEncodeSize();
+   }
+
+   public enum ZeroEncodingSupport implements EncodingSupport {
+      Instance;
+
+      @Override
+      public int getEncodeSize() {
+         return 0;
+      }
+
+      @Override
+      public void encode(ActiveMQBuffer buffer) {
+      }
+
+      @Override
+      public void decode(ActiveMQBuffer buffer) {
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/36555a10/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/GcFreeJournal.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/GcFreeJournal.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/GcFreeJournal.java
new file mode 100644
index 0000000..91bc6e5
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/GcFreeJournal.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.impl.JournalFile;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+
+final class GcFreeJournal extends JournalImpl {
+
+   private final AddJournalRecordEncoder addJournalRecordEncoder = new AddJournalRecordEncoder();
+   //TODO replace with thread local pools if not single threaded!
+   private ByteBuffer journalRecordBytes = null;
+
+   GcFreeJournal(final int fileSize,
+                 final int minFiles,
+                 final int poolSize,
+                 final int compactMinFiles,
+                 final int compactPercentage,
+                 final SequentialFileFactory fileFactory,
+                 final String filePrefix,
+                 final String fileExtension,
+                 final int maxAIO) {
+      super(fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0);
+   }
+
+   public static int align(final int value, final int alignment) {
+      return (value + (alignment - 1)) & ~(alignment - 1);
+   }
+
+   public void appendAddRecord(final long id,
+                               final int recordType,
+                               final ByteBuffer encodedRecord,
+                               final int offset,
+                               final int length,
+                               final boolean sync) throws Exception {
+      final int expectedLength = JournalRecordHeader.BYTES + AddJournalRecordEncoder.expectedSize(length);
+      final int alignedLength = align(expectedLength, 8);
+      switchFileIfNecessary(alignedLength);
+      final JournalFile currentFile = getCurrentFile();
+      final int fileId = currentFile.getRecordID();
+      if (this.journalRecordBytes == null || this.journalRecordBytes.capacity() < alignedLength) {
+         final int newPooledLength = align(alignedLength, 4096);
+         //TODO ADD LIMITS OR WARNS IN CASE OF TOO MUCH BIGGER SIZE
+         this.journalRecordBytes = ByteBuffer.allocateDirect(newPooledLength);
+         this.journalRecordBytes.order(ByteOrder.nativeOrder());
+      }
+      final long journalRecordHeader = JournalRecordHeader.makeHeader(JournalRecordTypes.ADD_JOURNAL, expectedLength);
+      this.journalRecordBytes.putLong(0, journalRecordHeader);
+      //use natural stride while encoding: FileId<CompactCount<Id<RecordType<RecordBytes
+      this.addJournalRecordEncoder.on(this.journalRecordBytes, JournalRecordHeader.BYTES).fileId(fileId).compactCount(0).id(id).recordType(recordType).record(encodedRecord, offset, length);
+      final SequentialFile sequentialFile = currentFile.getFile();
+      try {
+         this.journalRecordBytes.limit(alignedLength);
+         sequentialFile.writeDirect(this.journalRecordBytes, sync);
+      }
+      finally {
+         this.journalRecordBytes.clear();
+      }
+      //TODO AVOID INDEXING WITH CONCURRENT MAP!
+   }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/36555a10/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/GcFreeJournalLatencyBench.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/GcFreeJournalLatencyBench.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/GcFreeJournalLatencyBench.java
new file mode 100644
index 0000000..a859006
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/GcFreeJournalLatencyBench.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Files;
+import java.util.ArrayList;
+
+import net.openhft.chronicle.core.jlbh.JLBH;
+import net.openhft.chronicle.core.jlbh.JLBHOptions;
+import net.openhft.chronicle.core.jlbh.JLBHTask;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+
+public class GcFreeJournalLatencyBench implements JLBHTask {
+
+   private static final int FILE_SIZE = JournalImpl.SIZE_HEADER + (1024 * 1024 * 1024);
+   private static final JournalType JOURNAL_TYPE = JournalType.MAPPED;
+   private static final int ITERATIONS = 100_000;
+   private static final int WARMUP_ITERATIONS = 20_000;
+   private static final int TARGET_THROUGHPUT = 500_000;
+   private static final int TESTS = 5;
+   private static int TOTAL_MESSAGES = (ITERATIONS * TESTS + WARMUP_ITERATIONS);
+   private static int ENCODED_SIZE = 8;
+   private static int CHUNK_BYTES = FILE_SIZE;
+   private static int OVERLAP_BYTES = CHUNK_BYTES / 4;
+   private final SequentialFileFactory sequentialFileFactory;
+   private GcFreeJournal journal;
+   private JLBH jlbh;
+   private long id;
+   private ByteBuffer encodedRecord;
+
+   public GcFreeJournalLatencyBench(SequentialFileFactory sequentialFileFactory) {
+      this.sequentialFileFactory = sequentialFileFactory;
+   }
+
+   public static void main(String[] args) throws IOException {
+      final File journalDir = Files.createTempDirectory("seq_files").toFile();
+      journalDir.deleteOnExit();
+      final boolean buffered = false;
+      final int bufferSize = 4096;
+      final int bufferTimeout = 0;
+      final int maxIO = -1;
+      final boolean logRates = false;
+      final IOCriticalErrorListener criticalErrorListener = null;
+      final SequentialFileFactory sequentialFileFactory;
+      switch (JOURNAL_TYPE) {
+         case MAPPED:
+            sequentialFileFactory = new MappedSequentialFileFactory(journalDir, criticalErrorListener).chunkBytes(CHUNK_BYTES).overlapBytes(OVERLAP_BYTES);
+            break;
+         case NIO:
+            sequentialFileFactory = new NIOSequentialFileFactory(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener);
+            break;
+
+         default:
+            throw new AssertionError("!?");
+      }
+      final JLBHOptions lth = new JLBHOptions().warmUpIterations(WARMUP_ITERATIONS).iterations(ITERATIONS).throughput(TARGET_THROUGHPUT).runs(TESTS).recordOSJitter(true).accountForCoordinatedOmmission(true).jlbhTask(new GcFreeJournalLatencyBench(sequentialFileFactory));
+      new JLBH(lth).start();
+   }
+
+   @Override
+   public void init(JLBH jlbh) {
+      id = 0;
+      this.jlbh = jlbh;
+      final int expectedMaxSize = GcFreeJournal.align(JournalRecordHeader.BYTES + AddJournalRecordEncoder.expectedSize(ENCODED_SIZE), 8);
+      int numFiles = (int) ((TOTAL_MESSAGES * expectedMaxSize + 512) / FILE_SIZE * 1.3);
+      if (numFiles < 2) {
+         numFiles = 2;
+      }
+      this.encodedRecord = ByteBuffer.allocateDirect(ENCODED_SIZE);
+      this.encodedRecord.order(ByteOrder.nativeOrder());
+      this.journal = new GcFreeJournal(FILE_SIZE, numFiles, numFiles, 0, 0, sequentialFileFactory, "activemq-data", "amq", Integer.MAX_VALUE);
+      try {
+         journal.start();
+         journal.load(new ArrayList<RecordInfo>(), null, null);
+      }
+      catch (Exception e) {
+         throw new RuntimeException(e);
+      }
+
+   }
+
+   @Override
+   public void run(long startTimeNS) {
+      id++;
+      try {
+         journal.appendAddRecord(id, (byte) 0, encodedRecord, 0, ENCODED_SIZE, false);
+      }
+      catch (Exception e) {
+         throw new RuntimeException(e);
+      }
+      jlbh.sample(System.nanoTime() - startTimeNS);
+   }
+
+   @Override
+   public void complete() {
+      try {
+         journal.stop();
+         for (File journalFile : sequentialFileFactory.getDirectory().listFiles()) {
+            journalFile.deleteOnExit();
+         }
+      }
+      catch (Exception e) {
+         throw new RuntimeException(e);
+      }
+   }
+
+   private enum JournalType {
+      MAPPED,
+      NIO
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/36555a10/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalAddRecordEncoder.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalAddRecordEncoder.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalAddRecordEncoder.java
new file mode 100644
index 0000000..1096c63
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalAddRecordEncoder.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
+
+import java.nio.ByteBuffer;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * IT IS NOT A FLYWEIGHT BUT AN ENCODER: NEED TO RESPECT THE SEQUENCE OF WRITE:
+ * FileId<CompactCount<Id<RecordType<RecordBytes
+ */
+final class AddJournalRecordEncoder {
+
+   private static final int FILE_ID_OFFSET = 0;
+   private static final int COMPACT_COUNT_OFFSET = FILE_ID_OFFSET + 4;
+   private static final int ID_OFFSET = COMPACT_COUNT_OFFSET + 4;
+   private static final int RECORD_TYPE_OFFSET = ID_OFFSET + 8;
+   public static final int BLOCK_SIZE = RECORD_TYPE_OFFSET + 4;
+
+   private ByteBuffer bytes;
+   private int offset;
+   private int limit;
+
+   public static int expectedSize(int recordBytes) {
+      return BLOCK_SIZE + 4 + recordBytes;
+   }
+
+   public ByteBuffer bytes() {
+      return bytes;
+   }
+
+   public int offset() {
+      return this.offset;
+   }
+
+   public int limit() {
+      return this.limit;
+   }
+
+   public void limit(int limit) {
+      this.limit = limit;
+   }
+
+   public AddJournalRecordEncoder on(ByteBuffer bytes, int offset) {
+      this.bytes = bytes;
+      this.offset = offset;
+      this.limit = offset + BLOCK_SIZE;
+      return this;
+   }
+
+   public AddJournalRecordEncoder fileId(int value) {
+      this.bytes.putInt(offset + FILE_ID_OFFSET, value);
+      return this;
+   }
+
+   public AddJournalRecordEncoder compactCount(int value) {
+      this.bytes.putInt(offset + COMPACT_COUNT_OFFSET, value);
+      return this;
+   }
+
+   public AddJournalRecordEncoder id(long value) {
+      this.bytes.putLong(offset + ID_OFFSET, value);
+      return this;
+   }
+
+   public AddJournalRecordEncoder recordType(int value) {
+      this.bytes.putLong(offset + RECORD_TYPE_OFFSET, value);
+      return this;
+   }
+
+   public AddJournalRecordEncoder noRecord() {
+      this.bytes.putInt(this.limit, 0);
+      this.limit += 4;
+      return this;
+   }
+
+   public AddJournalRecordEncoder record(final ByteBuffer recordBytes, final int recordOffset, final int recordLength) {
+      this.bytes.putInt(this.limit, recordLength);
+      final long dstAddr = PlatformDependent.directBufferAddress(bytes) + this.limit + 4;
+      final long srcAddr = PlatformDependent.directBufferAddress(recordBytes) + recordOffset;
+      PlatformDependent.copyMemory(srcAddr, dstAddr, recordLength);
+      this.limit += (4 + recordLength);
+      return this;
+   }
+
+   public int encodedLength() {
+      return this.limit - this.offset;
+   }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/36555a10/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordHeader.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordHeader.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordHeader.java
new file mode 100644
index 0000000..98b205e
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordHeader.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
+
+final class JournalRecordHeader {
+
+   public static final int BYTES = 8;
+
+   public static long makeHeader(final int journalRecordTypeId, final int length) {
+      return ((journalRecordTypeId & 0xFFFF_FFFFL) << 32) | (length & 0xFFFF_FFFFL);
+   }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/36555a10/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordTypes.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordTypes.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordTypes.java
new file mode 100644
index 0000000..ece9b3a
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/journal/gcfree/JournalRecordTypes.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.benchmarks.journal.gcfree;
+
+/**
+ * Created by developer on 18/06/16.
+ */
+final class JournalRecordTypes {
+
+   public static final int ADD_JOURNAL = 11;
+
+   private JournalRecordTypes() {
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/36555a10/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/sequentialfile/SequentialFileLatencyBench.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/sequentialfile/SequentialFileLatencyBench.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/sequentialfile/SequentialFileLatencyBench.java
new file mode 100644
index 0000000..0a758db
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/benchmarks/sequentialfile/SequentialFileLatencyBench.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.extras.benchmarks.sequentialfile;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import net.openhft.chronicle.core.jlbh.JLBH;
+import net.openhft.chronicle.core.jlbh.JLBHOptions;
+import net.openhft.chronicle.core.jlbh.JLBHTask;
+import org.apache.activemq.artemis.core.io.DummyCallback;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+
+public final class SequentialFileLatencyBench implements JLBHTask {
+
+   private static final JournalType JOURNAL_TYPE = JournalType.MAPPED;
+   //NOTE: SUPPORTED ONLY ON *NIX
+   private static final boolean SHM = false;
+   private static final int JOURNAL_RECORD_SIZE = 8;
+   private static final int ITERATIONS = 100_000;
+   private static final int WARMUP_ITERATIONS = 20_000;
+   private static final int TARGET_THROUGHPUT = 500_000;
+   private static final int TESTS = 5;
+   private static int CHUNK_BYTES = 4096 * 1024 * 16;
+   private static int OVERLAP_BYTES = CHUNK_BYTES / 4;
+   private final SequentialFileFactory sequentialFileFactory;
+   private SequentialFile sequentialFile;
+   private ByteBuffer message;
+   private JLBH jlbh;
+   public SequentialFileLatencyBench(SequentialFileFactory sequentialFileFactory) {
+      this.sequentialFileFactory = sequentialFileFactory;
+   }
+
+   public static void main(String[] args) throws IOException {
+      final File journalDir;
+      if (SHM) {
+         journalDir = Files.createDirectory(Paths.get("/dev/shm/seq_files")).toFile();
+      }
+      else {
+         journalDir = Files.createTempDirectory("seq_files").toFile();
+      }
+      journalDir.deleteOnExit();
+      final boolean buffered = false;
+      final int bufferSize = 4096;
+      final int bufferTimeout = 0;
+      final int maxIO = -1;
+      final boolean logRates = false;
+      final IOCriticalErrorListener criticalErrorListener = null;
+      final SequentialFileFactory sequentialFileFactory;
+      switch (JOURNAL_TYPE) {
+         case MAPPED:
+            sequentialFileFactory = new MappedSequentialFileFactory(journalDir).chunkBytes(CHUNK_BYTES).overlapBytes(OVERLAP_BYTES);
+            break;
+         case NIO:
+            sequentialFileFactory = new NIOSequentialFileFactory(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, criticalErrorListener);
+            break;
+         default:
+            throw new AssertionError("!?");
+      }
+      final JLBHOptions lth = new JLBHOptions().warmUpIterations(WARMUP_ITERATIONS).iterations(ITERATIONS).throughput(TARGET_THROUGHPUT).runs(TESTS).recordOSJitter(true).accountForCoordinatedOmmission(true).jlbhTask(new SequentialFileLatencyBench(sequentialFileFactory));
+      new JLBH(lth).start();
+   }
+
+   @Override
+   public void init(JLBH jlbh) {
+      this.jlbh = jlbh;
+      this.sequentialFile = this.sequentialFileFactory.createSequentialFile(Long.toString(System.nanoTime()));
+      try {
+         this.sequentialFile.open(-1, false);
+         final File file = this.sequentialFile.getJavaFile();
+         file.deleteOnExit();
+         System.out.println("sequentialFile: " + file);
+      }
+      catch (Exception e) {
+         throw new RuntimeException(e);
+      }
+      this.message = this.sequentialFileFactory.allocateDirectBuffer(JOURNAL_RECORD_SIZE).order(ByteOrder.nativeOrder());
+
+   }
+
+   @Override
+   public void run(long startTimeNS) {
+      message.position(0);
+      try {
+         sequentialFile.writeDirect(message, false, DummyCallback.getInstance());
+      }
+      catch (Exception e) {
+         throw new RuntimeException(e);
+      }
+      jlbh.sample(System.nanoTime() - startTimeNS);
+   }
+
+   @Override
+   public void complete() {
+      sequentialFileFactory.releaseDirectBuffer(message);
+      try {
+         sequentialFile.close();
+      }
+      catch (Exception e) {
+         throw new RuntimeException(e);
+      }
+   }
+
+   private enum JournalType {
+      MAPPED,
+      NIO
+   }
+}
\ No newline at end of file


Mime
View raw message