activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [02/13] activemq-artemis git commit: ARTEMIS-832 Openwire was ignoring data syncs.
Date Fri, 04 Nov 2016 13:29:05 GMT
ARTEMIS-832 Openwire was ignoring data syncs.

I'm also adding the possibility of sync on libaio, and not only relay on write-cache


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bcbbc868
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bcbbc868
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bcbbc868

Branch: refs/heads/ARTEMIS-780
Commit: bcbbc86856cbb9679ce6886852797b3360605730
Parents: 749b831
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Tue Nov 1 21:38:02 2016 -0400
Committer: jbertram <jbertram@apache.com>
Committed: Thu Nov 3 20:35:15 2016 -0500

----------------------------------------------------------------------
 .../activemq/artemis/cli/commands/Create.java   |   8 +-
 .../cli/commands/util/SyncCalculation.java      |   7 +-
 .../artemis/cli/commands/etc/broker.xml         |   2 +
 .../apache/activemq/cli/test/ArtemisTest.java   |   8 +-
 .../config/ActiveMQDefaultConfiguration.java    |   7 +
 .../client/impl/ClientSessionFactoryImpl.java   |   3 +-
 .../store/file/JDBCSequentialFileFactory.java   |  12 +
 .../artemis/core/io/AbstractSequentialFile.java |  22 --
 .../core/io/AbstractSequentialFileFactory.java  |  15 ++
 .../artemis/core/io/SequentialFileFactory.java  |   4 +
 .../artemis/core/io/aio/AIOSequentialFile.java  |   8 +-
 .../core/io/aio/AIOSequentialFileFactory.java   |   2 +-
 .../core/io/mapped/MappedSequentialFile.java    |  19 +-
 .../io/mapped/MappedSequentialFileFactory.java  |  14 +-
 .../artemis/core/io/nio/NIOSequentialFile.java  |  65 +----
 artemis-native/bin/libartemis-native-64.so      | Bin 25003 -> 28687 bytes
 ...che_activemq_artemis_jlibaio_LibaioContext.c |  11 +-
 .../activemq/artemis/jlibaio/LibaioContext.java |  12 +-
 .../artemis/jlibaio/test/LibaioTest.java        |  10 +-
 .../jlibaio/test/OpenCloseContextTest.java      |   8 +-
 .../amqp/broker/AMQPConnectionCallback.java     |   2 +-
 .../amqp/broker/AMQPSessionCallback.java        |   9 +-
 .../protocol/mqtt/MQTTConnectionManager.java    |   3 +-
 .../protocol/openwire/OpenWireConnection.java   | 107 +++++----
 .../core/protocol/openwire/amq/AMQSession.java  |   4 +-
 .../protocol/stomp/StompProtocolManager.java    |   4 +-
 .../artemis/core/config/Configuration.java      |  17 ++
 .../core/config/impl/ConfigurationImpl.java     |  13 +
 .../deployers/impl/FileConfigurationParser.java |   2 +
 .../impl/journal/JournalStorageManager.java     |   4 +
 .../core/impl/ActiveMQPacketHandler.java        |   5 +-
 .../artemis/core/server/ActiveMQServer.java     |   6 +-
 .../core/server/impl/ActiveMQServerImpl.java    |   9 +-
 .../resources/schema/artemis-configuration.xsd  |   8 +
 .../core/config/impl/FileConfigurationTest.java |   2 +
 .../artemis/tests/util/ActiveMQTestBase.java    |   2 +-
 .../resources/ConfigurationTest-full-config.xml |   1 +
 docs/user-manual/en/configuration-index.md      |   1 +
 docs/user-manual/en/persistence.md              |   4 +
 .../integration/persistence/SyncSendTest.java   | 235 +++++++++++++++++++
 .../vertx/ActiveMQVertxUnitTest.java            |   5 +-
 .../impl/fakes/FakeSequentialFileFactory.java   |  10 +
 42 files changed, 505 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
index ecb9e49..be788cd 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
@@ -213,6 +213,9 @@ public class Create extends InputAbstract {
    @Option(name = "--no-hornetq-acceptor", description = "Disable the HornetQ specific acceptor.")
    boolean noHornetQAcceptor;
 
+   @Option(name = "--no-fsync", description = "Disable usage of fdatasync (channel.force(false) from java nio) on the journal")
+   boolean noJournalSync;
+
    boolean IS_WINDOWS;
 
    boolean IS_CYGWIN;
@@ -567,6 +570,7 @@ public class Create extends InputAbstract {
          filters.put("${web.protocol}", "http");
          filters.put("${extra.web.attributes}", "");
       }
+      filters.put("${fsync}", String.valueOf(!noJournalSync));
       filters.put("${user}", System.getProperty("user.name", ""));
       filters.put("${default.port}", String.valueOf(defaultPort + portOffset));
       filters.put("${amqp.port}", String.valueOf(AMQP_PORT + portOffset));
@@ -776,7 +780,7 @@ public class Create extends InputAbstract {
             System.out.println("");
             System.out.println("Auto tuning journal ...");
 
-            long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, aio);
+            long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, aio);
             long nanoseconds = SyncCalculation.toNanos(time, writes);
             double writesPerMillisecond = (double) writes / (double) time;
 
@@ -807,7 +811,7 @@ public class Create extends InputAbstract {
          // forcing NIO
          return false;
       } else if (LibaioContext.isLoaded()) {
-         try (LibaioContext context = new LibaioContext(1, true)) {
+         try (LibaioContext context = new LibaioContext(1, true, true)) {
             File tmpFile = new File(directory, "validateAIO.bin");
             boolean supportsLibaio = true;
             try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
index 468eabf..315ebdc 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
@@ -46,8 +46,9 @@ public class SyncCalculation {
                                int blocks,
                                int tries,
                                boolean verbose,
+                               boolean fsync,
                                boolean aio) throws Exception {
-      SequentialFileFactory factory = newFactory(datafolder, aio);
+      SequentialFileFactory factory = newFactory(datafolder, fsync, aio);
       SequentialFile file = factory.createSequentialFile("test.tmp");
 
       try {
@@ -149,9 +150,9 @@ public class SyncCalculation {
       return timeWait;
    }
 
-   private static SequentialFileFactory newFactory(File datafolder, boolean aio) {
+   private static SequentialFileFactory newFactory(File datafolder, boolean datasync, boolean aio) {
       if (aio && LibaioContext.isLoaded()) {
-         SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1);
+         SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync);
          factory.start();
          ((AIOSequentialFileFactory) factory).disableBufferReuse();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
index fe28246..58c103c 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
@@ -45,6 +45,8 @@ under the License.
 
       <large-messages-directory>${data.dir}/large-messages</large-messages-directory>
 
+      <journal-datasync>${fsync}</journal-datasync>
+
       <journal-min-files>2</journal-min-files>
 
       <journal-pool-files>-1</journal-pool-files>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
index ba78fb2..2359f1d 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
@@ -129,7 +129,7 @@ public class ArtemisTest {
    public void testSync() throws Exception {
       int writes = 20;
       int tries = 10;
-      long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true);
+      long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, true);
       System.out.println();
       System.out.println("TotalAvg = " + totalAvg);
       long nanoTime = SyncCalculation.toNanos(totalAvg, writes);
@@ -144,7 +144,7 @@ public class ArtemisTest {
       Run.setEmbedded(true);
       //instance1: default using http
       File instance1 = new File(temporaryFolder.getRoot(), "instance1");
-      Artemis.main("create", instance1.getAbsolutePath(), "--silent");
+      Artemis.main("create", instance1.getAbsolutePath(), "--silent", "--no-fsync");
       File bootstrapFile = new File(new File(instance1, "etc"), "bootstrap.xml");
       Assert.assertTrue(bootstrapFile.exists());
       Document config = parseXml(bootstrapFile);
@@ -163,7 +163,7 @@ public class ArtemisTest {
 
       //instance2: https
       File instance2 = new File(temporaryFolder.getRoot(), "instance2");
-      Artemis.main("create", instance2.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1");
+      Artemis.main("create", instance2.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--no-fsync");
       bootstrapFile = new File(new File(instance2, "etc"), "bootstrap.xml");
       Assert.assertTrue(bootstrapFile.exists());
       config = parseXml(bootstrapFile);
@@ -184,7 +184,7 @@ public class ArtemisTest {
 
       //instance3: https with clientAuth
       File instance3 = new File(temporaryFolder.getRoot(), "instance3");
-      Artemis.main("create", instance3.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--use-client-auth", "--ssl-trust", "etc/truststore", "--ssl-trust-password", "password2");
+      Artemis.main("create", instance3.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--use-client-auth", "--ssl-trust", "etc/truststore", "--ssl-trust-password", "password2", "--no-fsync");
       bootstrapFile = new File(new File(instance3, "etc"), "bootstrap.xml");
       Assert.assertTrue(bootstrapFile.exists());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 04d06c0..b952430 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -130,6 +130,9 @@ public final class ActiveMQDefaultConfiguration {
    // true means that the server will use the file based journal for persistence.
    private static boolean DEFAULT_PERSISTENCE_ENABLED = true;
 
+   // true means that the server will sync data files
+   private static boolean DEFAULT_JOURNAL_DATASYNC = true;
+
    // Maximum number of threads to use for the scheduled thread pool
    private static int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5;
 
@@ -460,6 +463,10 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_PERSISTENCE_ENABLED;
    }
 
+   public static boolean isDefaultJournalDatasync() {
+      return DEFAULT_JOURNAL_DATASYNC;
+   }
+
    /**
     * Maximum number of threads to use for the scheduled thread pool
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index d781fff..d2d9886 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -93,7 +93,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
    private final long connectionTTL;
 
-   private final Set<ClientSessionInternal> sessions = new HashSet<>();
+   private final Set<ClientSessionInternal> sessions = new ConcurrentHashSet<>();
 
    private final Object createSessionLock = new Object();
 
@@ -506,6 +506,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
          // this is just a debug, since an interrupt is an expected event (in case of a shutdown)
          logger.debug(e1.getMessage(), e1);
       } catch (Throwable t) {
+         logger.warn(t.getMessage(), t);
          //for anything else just close so clients are un blocked
          close();
          throw t;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index cafb261..66f00ec 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -61,6 +61,18 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
    }
 
    @Override
+   public SequentialFileFactory setDatasync(boolean enabled) {
+
+      // noop
+      return this;
+   }
+
+   @Override
+   public boolean isDatasync() {
+      return false;
+   }
+
+   @Override
    public synchronized void start() {
       try {
          if (!started) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
index 0c6dcdf..cd15246 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
@@ -21,9 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -59,11 +57,6 @@ public abstract class AbstractSequentialFile implements SequentialFile {
    protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
 
    /**
-    * Used for asynchronous writes
-    */
-   protected final Executor writerExecutor;
-
-   /**
     * @param file
     * @param directory
     */
@@ -75,7 +68,6 @@ public abstract class AbstractSequentialFile implements SequentialFile {
       this.file = new File(directory, file);
       this.directory = directory;
       this.factory = factory;
-      this.writerExecutor = writerExecutor;
    }
 
    // Public --------------------------------------------------------
@@ -166,20 +158,6 @@ public abstract class AbstractSequentialFile implements SequentialFile {
     */
    @Override
    public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
-      final CountDownLatch donelatch = new CountDownLatch(1);
-
-      if (writerExecutor != null) {
-         writerExecutor.execute(new Runnable() {
-            @Override
-            public void run() {
-               donelatch.countDown();
-            }
-         });
-
-         while (!donelatch.await(60, TimeUnit.SECONDS)) {
-            ActiveMQJournalLogger.LOGGER.couldNotCompleteTask(new Exception("trace"), file.getName());
-         }
-      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
index 6e61c86..5aa723d 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
@@ -52,6 +52,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
 
    protected final int maxIO;
 
+   protected boolean dataSync = true;
+
    private final IOCriticalErrorListener critialErrorListener;
 
    /**
@@ -81,6 +83,19 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
       this.maxIO = maxIO;
    }
 
+
+   @Override
+   public SequentialFileFactory setDatasync(boolean enabled) {
+      this.dataSync = enabled;
+      return this;
+   }
+
+   @Override
+   public boolean isDatasync() {
+      return dataSync;
+   }
+
+
    @Override
    public void stop() {
       if (timedBuffer != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
index 81203cf..2229edf 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
@@ -95,4 +95,8 @@ public interface SequentialFileFactory {
    void createDirs() throws Exception;
 
    void flush();
+
+   SequentialFileFactory setDatasync(boolean enabled);
+
+   boolean isDatasync();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
index a0d20d2..874e411 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
@@ -97,7 +97,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
 
    @Override
    public SequentialFile cloneFile() {
-      return new AIOSequentialFile(aioFactory, -1, -1, getFile().getParentFile(), getFile().getName(), writerExecutor);
+      return new AIOSequentialFile(aioFactory, -1, -1, getFile().getParentFile(), getFile().getName(), null);
    }
 
    @Override
@@ -214,11 +214,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
 
       AIOSequentialFileFactory.AIOSequentialCallback runnableCallback = getCallback(callback, bytes);
       runnableCallback.initWrite(positionToWrite, bytesToWrite);
-      if (writerExecutor != null) {
-         writerExecutor.execute(runnableCallback);
-      } else {
-         runnableCallback.run();
-      }
+      runnableCallback.run();
    }
 
    AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
index da0d079..57d18f5 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
@@ -211,7 +211,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
       if (running.compareAndSet(false, true)) {
          super.start();
 
-         this.libaioContext = new LibaioContext(maxIO, true);
+         this.libaioContext = new LibaioContext(maxIO, true, dataSync);
 
          this.running.set(true);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
index 522dbd1..017948b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java
@@ -49,12 +49,15 @@ final class MappedSequentialFile implements SequentialFile {
    private String fileName;
    private MappedFile mappedFile;
    private ActiveMQBuffer pooledActiveMQBuffer;
+   private final MappedSequentialFileFactory factory;
 
-   MappedSequentialFile(final File directory,
+   MappedSequentialFile(MappedSequentialFileFactory factory,
+                        final File directory,
                         final File file,
                         final long chunkBytes,
                         final long overlapBytes,
                         final IOCriticalErrorListener criticalErrorListener) {
+      this.factory = factory;
       this.directory = directory;
       this.file = file;
       this.absoluteFile = null;
@@ -155,7 +158,7 @@ final class MappedSequentialFile implements SequentialFile {
          final int readableBytes = writerIndex - readerIndex;
          if (readableBytes > 0) {
             this.mappedFile.write(byteBuf, readerIndex, readableBytes);
-            if (sync) {
+            if (factory.isDatasync() && sync) {
                this.mappedFile.force();
             }
          }
@@ -178,7 +181,7 @@ final class MappedSequentialFile implements SequentialFile {
       final int readableBytes = writerIndex - readerIndex;
       if (readableBytes > 0) {
          this.mappedFile.write(byteBuf, readerIndex, readableBytes);
-         if (sync) {
+         if (factory.isDatasync() && sync) {
             this.mappedFile.force();
          }
       }
@@ -209,7 +212,7 @@ final class MappedSequentialFile implements SequentialFile {
          final int readableBytes = writerIndex - readerIndex;
          if (readableBytes > 0) {
             this.mappedFile.write(byteBuf, readerIndex, readableBytes);
-            if (sync) {
+            if (factory.isDatasync() && sync) {
                this.mappedFile.force();
             }
          }
@@ -235,7 +238,7 @@ final class MappedSequentialFile implements SequentialFile {
       final int readableBytes = writerIndex - readerIndex;
       if (readableBytes > 0) {
          this.mappedFile.write(byteBuf, readerIndex, readableBytes);
-         if (sync) {
+         if (factory.isDatasync() && sync) {
             this.mappedFile.force();
          }
       }
@@ -253,7 +256,7 @@ final class MappedSequentialFile implements SequentialFile {
          final int remaining = limit - position;
          if (remaining > 0) {
             this.mappedFile.write(bytes, position, remaining);
-            if (sync) {
+            if (factory.isDatasync() && sync) {
                this.mappedFile.force();
             }
          }
@@ -275,7 +278,7 @@ final class MappedSequentialFile implements SequentialFile {
       final int remaining = limit - position;
       if (remaining > 0) {
          this.mappedFile.write(bytes, position, remaining);
-         if (sync) {
+         if (factory.isDatasync() && sync) {
             this.mappedFile.force();
          }
       }
@@ -381,7 +384,7 @@ final class MappedSequentialFile implements SequentialFile {
    @Override
    public SequentialFile cloneFile() {
       checkIsNotOpen();
-      return new MappedSequentialFile(this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener);
+      return new MappedSequentialFile(factory, this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
index 23af0b6..8ccef74 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
@@ -37,6 +37,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
    private final IOCriticalErrorListener criticalErrorListener;
    private long chunkBytes;
    private long overlapBytes;
+   private boolean useDataSync;
 
    public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
       this.directory = directory;
@@ -72,7 +73,18 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
 
    @Override
    public SequentialFile createSequentialFile(String fileName) {
-      return new MappedSequentialFile(directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
+      return new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
+   }
+
+   @Override
+   public SequentialFileFactory setDatasync(boolean enabled) {
+      this.useDataSync = enabled;
+      return this;
+   }
+
+   @Override
+   public boolean isDatasync() {
+      return useDataSync;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
index 40e0544..2887d25 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
@@ -23,8 +23,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -35,7 +33,6 @@ import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 
 public final class NIOSequentialFile extends AbstractSequentialFile {
 
@@ -43,11 +40,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
 
    private RandomAccessFile rfile;
 
-   /**
-    * The write semaphore here is only used when writing asynchronously
-    */
-   private Semaphore maxIOSemaphore;
-
    private final int defaultMaxIO;
 
    private int maxIO;
@@ -99,11 +91,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
          factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
          throw e;
       }
-
-      if (writerExecutor != null && useExecutor) {
-         maxIOSemaphore = new Semaphore(maxIO);
-         this.maxIO = maxIO;
-      }
    }
 
    @Override
@@ -124,6 +111,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
          factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
          throw e;
       }
+      channel.force(true);
 
       fileSize = channel.size();
    }
@@ -138,13 +126,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
    public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
       super.close();
 
-      if (maxIOSemaphore != null) {
-         while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS)) {
-            ActiveMQJournalLogger.LOGGER.errorClosingFile(getFileName());
-         }
-      }
-
-      maxIOSemaphore = null;
       try {
          if (channel != null) {
             channel.close();
@@ -202,7 +183,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
 
    @Override
    public void sync() throws IOException {
-      if (channel != null) {
+      if (factory.isDatasync() && channel != null) {
          try {
             channel.force(false);
          } catch (ClosedChannelException e) {
@@ -250,7 +231,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
 
    @Override
    public SequentialFile cloneFile() {
-      return new NIOSequentialFile(factory, directory, getFileName(), maxIO, writerExecutor);
+      return new NIOSequentialFile(factory, directory, getFileName(), maxIO, null);
    }
 
    @Override
@@ -298,40 +279,12 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
 
       position.addAndGet(bytes.limit());
 
-      if (maxIOSemaphore == null || callback == null) {
-         // if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous
-         try {
-            doInternalWrite(bytes, sync, callback);
-         } catch (ClosedChannelException e) {
-            throw e;
-         } catch (IOException e) {
-            factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
-         }
-      } else {
-         // This is a flow control on writing, just like maxAIO on libaio
-         maxIOSemaphore.acquire();
-
-         writerExecutor.execute(new Runnable() {
-            @Override
-            public void run() {
-               try {
-                  try {
-                     doInternalWrite(bytes, sync, callback);
-                  } catch (ClosedChannelException e) {
-                     ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
-                  } catch (IOException e) {
-                     ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
-                     factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this);
-                     callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
-                  } catch (Throwable e) {
-                     ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
-                     callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
-                  }
-               } finally {
-                  maxIOSemaphore.release();
-               }
-            }
-         });
+      try {
+         doInternalWrite(bytes, sync, callback);
+      } catch (ClosedChannelException e) {
+         throw e;
+      } catch (IOException e) {
+         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/bin/libartemis-native-64.so
----------------------------------------------------------------------
diff --git a/artemis-native/bin/libartemis-native-64.so b/artemis-native/bin/libartemis-native-64.so
old mode 100644
new mode 100755
index 95a5451..8cbe851
Binary files a/artemis-native/bin/libartemis-native-64.so and b/artemis-native/bin/libartemis-native-64.so differ

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c
index 74545fc..3f7c213 100644
--- a/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c
+++ b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c
@@ -536,7 +536,7 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_su
 }
 
 JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll
-  (JNIEnv * env, jobject thisObject, jobject contextPointer) {
+  (JNIEnv * env, jobject thisObject, jobject contextPointer, jboolean useFdatasync) {
 
     #ifdef DEBUG
        fprintf (stdout, "Running blockedPoll\n");
@@ -553,6 +553,8 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl
 
     short running = 1;
 
+    int lastFile = -1;
+
     while (running) {
 
         int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0);
@@ -574,6 +576,8 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl
            fflush(stdout);
         #endif
 
+        lastFile = -1;
+
         for (i = 0; i < result; i++)
         {
             #ifdef DEBUG
@@ -593,6 +597,11 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl
                break;
             }
 
+            if (useFdatasync && lastFile != iocbp->aio_fildes) {
+                lastFile = iocbp->aio_fildes;
+                fdatasync(lastFile);
+            }
+
 
             int eventResult = (int)event->res;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
index 8049a97..cdaea55 100644
--- a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
+++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
@@ -49,7 +49,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
     * <br>
     * Or else the native module won't be loaded because of version mismatches
     */
-   private static final int EXPECTED_NATIVE_VERSION = 6;
+   private static final int EXPECTED_NATIVE_VERSION = 7;
 
    private static boolean loaded = false;
 
@@ -146,6 +146,8 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
 
    final int queueSize;
 
+   final boolean useFdatasync;
+
    /**
     * The queue size here will use resources defined on the kernel parameter
     * <a href="https://www.kernel.org/doc/Documentation/sysctl/fs.txt">fs.aio-max-nr</a> .
@@ -153,11 +155,13 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
     * @param queueSize    the size to be initialize on libaio
     *                     io_queue_init which can't be higher than /proc/sys/fs/aio-max-nr.
     * @param useSemaphore should block on a semaphore avoiding using more submits than what's available.
+    * @param useFdatasync should use fdatasync before calling callbacks.
     */
-   public LibaioContext(int queueSize, boolean useSemaphore) {
+   public LibaioContext(int queueSize, boolean useSemaphore, boolean useFdatasync) {
       try {
          contexts.incrementAndGet();
          this.ioContext = newContext(queueSize);
+         this.useFdatasync = useFdatasync;
       } catch (Exception e) {
          throw e;
       }
@@ -349,7 +353,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
     */
    public void poll() {
       if (!closed.get()) {
-         blockedPoll(ioContext);
+         blockedPoll(ioContext, useFdatasync);
       }
    }
 
@@ -436,7 +440,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
    /**
     * This method will block as long as the context is open.
     */
-   native void blockedPoll(ByteBuffer libaioContext);
+   native void blockedPoll(ByteBuffer libaioContext, boolean useFdatasync);
 
    static native int getNativeVersion();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java
index 7f98f0d..1013966 100644
--- a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java
+++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java
@@ -54,7 +54,7 @@ public class LibaioTest {
          parent.mkdirs();
 
          boolean failed = false;
-         try (LibaioContext control = new LibaioContext<>(1, true); LibaioFile fileDescriptor = control.openFile(file, true)) {
+         try (LibaioContext control = new LibaioContext<>(1, true, true); LibaioFile fileDescriptor = control.openFile(file, true)) {
             fileDescriptor.fallocate(4 * 1024);
          } catch (Exception e) {
             e.printStackTrace();
@@ -80,7 +80,7 @@ public class LibaioTest {
 
    @Before
    public void setUpFactory() {
-      control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true);
+      control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true, true);
    }
 
    @After
@@ -532,10 +532,10 @@ public class LibaioTest {
       boolean exceptionThrown = false;
 
       control.close();
-      control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false);
+      control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false, true);
       try {
          // There is no space for a queue this huge, the native layer should throw the exception
-         LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false);
+         LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false, true);
       } catch (RuntimeException e) {
          exceptionThrown = true;
       }
@@ -630,7 +630,7 @@ public class LibaioTest {
 
    @Test
    public void testBlockedCallback() throws Exception {
-      final LibaioContext blockedContext = new LibaioContext(500, true);
+      final LibaioContext blockedContext = new LibaioContext(500, true, true);
       Thread t = new Thread() {
          @Override
          public void run() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java
----------------------------------------------------------------------
diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java
index c04bff4..b515663 100644
--- a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java
+++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java
@@ -53,7 +53,7 @@ public class OpenCloseContextTest {
 
       for (int i = 0; i < 10; i++) {
          System.out.println("#test " + i);
-         final LibaioContext control = new LibaioContext<>(5, true);
+         final LibaioContext control = new LibaioContext<>(5, true, true);
          Thread t = new Thread() {
             @Override
             public void run() {
@@ -111,7 +111,7 @@ public class OpenCloseContextTest {
 
       for (int i = 0; i < 10; i++) {
          System.out.println("#test " + i);
-         final LibaioContext control = new LibaioContext<>(5, true);
+         final LibaioContext control = new LibaioContext<>(5, true, true);
          Thread t = new Thread() {
             @Override
             public void run() {
@@ -164,9 +164,9 @@ public class OpenCloseContextTest {
 
    @Test
    public void testCloseAndStart() throws Exception {
-      final LibaioContext control2 = new LibaioContext<>(5, true);
+      final LibaioContext control2 = new LibaioContext<>(5, true, true);
 
-      final LibaioContext control = new LibaioContext<>(5, true);
+      final LibaioContext control = new LibaioContext<>(5, true, true);
       control.close();
       control.poll();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 4ced546..24c625c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -177,7 +177,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
    }
 
    public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
-      return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor);
+      return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext());
    }
 
    public void sendSASLSupported() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 66c7b4b..acbb2e9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
@@ -81,6 +82,8 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private ServerSession serverSession;
 
+   private final OperationContext operationContext;
+
    private AMQPSessionContext protonSession;
 
    private final Executor closeExecutor;
@@ -91,12 +94,14 @@ public class AMQPSessionCallback implements SessionCallback {
                               ProtonProtocolManager manager,
                               AMQPConnectionContext connection,
                               Connection transportConnection,
-                              Executor executor) {
+                              Executor executor,
+                              OperationContext operationContext) {
       this.protonSPI = protonSPI;
       this.manager = manager;
       this.connection = connection;
       this.transportConnection = transportConnection;
       this.closeExecutor = executor;
+      this.operationContext = operationContext;
    }
 
    @Override
@@ -151,7 +156,7 @@ public class AMQPSessionCallback implements SessionCallback {
                                                         false, // boolean autoCommitAcks,
                                                         false, // boolean preAcknowledge,
                                                         true, //boolean xa,
-                                                        (String) null, this, true);
+                                                        (String) null, this, true, operationContext);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index 3a1f447..ce65648 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -96,7 +96,8 @@ public class MQTTConnectionManager {
       String id = UUIDGenerator.getInstance().generateStringUUID();
       ActiveMQServer server = session.getServer();
 
-      ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null, session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE);
+      ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null,
+                                                         session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE, server.newOperationContext());
       return (ServerSessionImpl) serverSession;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 33418e6..8dc0b34 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -119,12 +120,15 @@ import org.apache.activemq.state.SessionState;
 import org.apache.activemq.transport.TransmitCallback;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.WireFormat;
+import org.jboss.logging.Logger;
 
 /**
  * Represents an activemq connection.
  */
 public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth {
 
+   private static final Logger logger = Logger.getLogger(OpenWireConnection.class);
+
    private static final KeepAliveInfo PING = new KeepAliveInfo();
 
    private final OpenWireProtocolManager protocolManager;
@@ -139,17 +143,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
    private final AtomicBoolean stopping = new AtomicBoolean(false);
 
-   private boolean inServiceException;
-
-   private final AtomicBoolean asyncException = new AtomicBoolean(false);
-
-   // Clebert: Artemis session has meta-data support, perhaps we could reuse it here
    private final Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
 
    private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new ConcurrentHashMap<>();
    private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new ConcurrentHashMap<>();
 
-   // Clebert TODO: Artemis already stores the Session. Why do we need a different one here
    private final Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
 
    private ConnectionState state;
@@ -172,6 +170,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
     */
    private ServerSession internalSession;
 
+   private final OperationContext operationContext;
+
    private volatile long lastSent = -1;
    private ConnectionEntry connectionEntry;
    private boolean useKeepAlive;
@@ -185,6 +185,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
                              OpenWireFormat wf) {
       super(connection, executor);
       this.server = server;
+      this.operationContext = server.newOperationContext();
       this.protocolManager = openWireProtocolManager;
       this.wireFormat = wf;
       this.useKeepAlive = openWireProtocolManager.isUseKeepAlive();
@@ -201,6 +202,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       return info.getUserName();
    }
 
+
+   public OperationContext getOperationContext() {
+      return operationContext;
+   }
+
    // SecurityAuth implementation
    @Override
    public RemotingConnection getRemotingConnection() {
@@ -239,6 +245,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       super.bufferReceived(connectionID, buffer);
       try {
 
+         recoverOperationContext();
+
          Command command = (Command) wireFormat.unmarshal(buffer);
 
          boolean responseRequired = command.isResponseRequired();
@@ -285,17 +293,38 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
                }
             }
 
-            // TODO: response through operation-context
-
-            if (response != null && !protocolManager.isStopping()) {
-               response.setCorrelationId(commandId);
-               dispatchSync(response);
-            }
+            sendAsyncResponse(commandId, response);
          }
       } catch (Exception e) {
          ActiveMQServerLogger.LOGGER.debug(e);
 
          sendException(e);
+      } finally {
+         clearupOperationContext();
+      }
+   }
+
+   /** It will send the response through the operation context, as soon as everything is confirmed on disk */
+   private void sendAsyncResponse(final int commandId, final Response response) throws Exception {
+      if (response != null) {
+         operationContext.executeOnCompletion(new IOCallback() {
+            @Override
+            public void done() {
+               if (!protocolManager.isStopping()) {
+                  try {
+                     response.setCorrelationId(commandId);
+                     dispatchSync(response);
+                  } catch (Exception e) {
+                     sendException(e);
+                  }
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+               sendException(new IOException(errorCode + "-" + errorMessage));
+            }
+         });
       }
    }
 
@@ -626,7 +655,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    }
 
    private void createInternalSession(ConnectionInfo info) throws Exception {
-      internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true);
+      internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext);
    }
 
    //raise the refCount of context
@@ -1083,7 +1112,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processBeginTransaction(TransactionInfo info) throws Exception {
          final TransactionId txID = info.getTransactionId();
 
-         setOperationContext(null);
          try {
             internalSession.resetTX(null);
             if (txID.isXATransaction()) {
@@ -1101,7 +1129,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
             }
          } finally {
             internalSession.resetTX(null);
-            clearOpeartionContext();
          }
          return null;
       }
@@ -1118,12 +1145,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
          AMQSession session = (AMQSession) tx.getProtocolData();
 
-         setOperationContext(session);
-         try {
-            tx.commit(onePhase);
-         } finally {
-            clearOpeartionContext();
-         }
+         tx.commit(onePhase);
 
          return null;
       }
@@ -1137,21 +1159,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processForgetTransaction(TransactionInfo info) throws Exception {
          TransactionId txID = info.getTransactionId();
 
-         setOperationContext(null);
-         try {
-            if (txID.isXATransaction()) {
-               try {
-                  Xid xid = OpenWireUtil.toXID(info.getTransactionId());
-                  internalSession.xaForget(xid);
-               } catch (Exception e) {
-                  e.printStackTrace();
-                  throw e;
-               }
-            } else {
-               txMap.remove(txID);
+         if (txID.isXATransaction()) {
+            try {
+               Xid xid = OpenWireUtil.toXID(info.getTransactionId());
+               internalSession.xaForget(xid);
+            } catch (Exception e) {
+               e.printStackTrace();
+               throw e;
             }
-         } finally {
-            clearOpeartionContext();
+         } else {
+            txMap.remove(txID);
          }
 
          return null;
@@ -1161,7 +1178,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processPrepareTransaction(TransactionInfo info) throws Exception {
          TransactionId txID = info.getTransactionId();
 
-         setOperationContext(null);
          try {
             if (txID.isXATransaction()) {
                try {
@@ -1177,7 +1193,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
             }
          } finally {
             internalSession.resetTX(null);
-            clearOpeartionContext();
          }
 
          return new IntegerResponse(XAResource.XA_RDONLY);
@@ -1187,7 +1202,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processEndTransaction(TransactionInfo info) throws Exception {
          TransactionId txID = info.getTransactionId();
 
-         setOperationContext(null);
          if (txID.isXATransaction()) {
             try {
                Transaction tx = lookupTX(txID, null);
@@ -1204,7 +1218,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
             }
          } else {
             txMap.remove(txID);
-            clearOpeartionContext();
          }
 
          return null;
@@ -1267,13 +1280,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
          Transaction tx = lookupTX(messageSend.getTransactionId(), session);
 
-         setOperationContext(session);
          session.getCoreSession().resetTX(tx);
          try {
             session.send(producerInfo, messageSend, sendProducerAck);
          } finally {
             session.getCoreSession().resetTX(null);
-            clearOpeartionContext();
          }
 
          return null;
@@ -1283,7 +1294,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processMessageAck(MessageAck ack) throws Exception {
          AMQSession session = getSession(ack.getConsumerId().getParentId());
          Transaction tx = lookupTX(ack.getTransactionId(), session);
-         setOperationContext(session);
          session.getCoreSession().resetTX(tx);
 
          try {
@@ -1291,7 +1301,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
             consumerBrokerExchange.acknowledge(ack);
          } finally {
             session.getCoreSession().resetTX(null);
-            clearOpeartionContext();
          }
          return null;
       }
@@ -1367,17 +1376,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
    }
 
-   private void setOperationContext(AMQSession session) {
-      OperationContext ctx;
-      if (session == null) {
-         ctx = this.internalSession.getSessionContext();
-      } else {
-         ctx = session.getCoreSession().getSessionContext();
-      }
-      server.getStorageManager().setContext(ctx);
+   private void recoverOperationContext() {
+      server.getStorageManager().setContext(this.operationContext);
    }
 
-   private void clearOpeartionContext() {
+   private void clearupOperationContext() {
       server.getStorageManager().clearContext();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 714a29a..426f4e6 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -107,7 +107,7 @@ public class AMQSession implements SessionCallback {
       // now
 
       try {
-         coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true);
+         coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext());
 
          long sessionId = sessInfo.getSessionId().getValue();
          if (sessionId == -1) {
@@ -290,8 +290,6 @@ public class AMQSession implements SessionCallback {
       } else {
          final Connection transportConnection = connection.getTransportConnection();
 
-         //         new Exception("Setting to false").printStackTrace();
-
          if (transportConnection == null) {
             // I don't think this could happen, but just in case, avoiding races
             runnable = null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 46f8e4c..6029b37 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -230,7 +230,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
       if (stompSession == null) {
          stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
          String name = UUIDGenerator.getInstance().generateStringUUID();
-         ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true);
+         ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true, server.newOperationContext());
          stompSession.setServerSession(session);
          sessions.put(connection.getID(), stompSession);
       }
@@ -243,7 +243,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
       if (stompSession == null) {
          stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor));
          String name = UUIDGenerator.getInstance().generateStringUUID();
-         ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true);
+         ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true, server.newOperationContext());
          stompSession.setServerSession(session);
          transactedSessions.put(txID, stompSession);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 17a305e..8d47f97 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -79,6 +79,23 @@ public interface Configuration {
    Configuration setPersistenceEnabled(boolean enable);
 
    /**
+    * Should use fdatasync on journal files.
+    *
+    * @see <a href="http://man7.org/linux/man-pages/man2/fdatasync.2.html">fdatasync</a>
+    *
+    * @return a boolean
+    */
+   boolean isJournalDatasync();
+
+   /**
+    * documented at {@link #isJournalDatasync()} ()}
+    *
+    * @param enable
+    * @return this
+    */
+   Configuration setJournalDatasync(boolean enable);
+
+   /**
     * @return usernames mapped to ResourceLimitSettings
     */
    Map<String, ResourceLimitSettings> getResourceLimitSettings();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 8ff1922..3b66f83 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -78,6 +78,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
 
    private boolean persistenceEnabled = ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled();
 
+   private boolean journalDatasync = ActiveMQDefaultConfiguration.isDefaultJournalDatasync();
+
    protected long fileDeploymentScanPeriod = ActiveMQDefaultConfiguration.getDefaultFileDeployerScanPeriod();
 
    private boolean persistDeliveryCountBeforeDelivery = ActiveMQDefaultConfiguration.isDefaultPersistDeliveryCountBeforeDelivery();
@@ -301,6 +303,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
    }
 
    @Override
+   public boolean isJournalDatasync() {
+      return journalDatasync;
+   }
+
+   @Override
+   public ConfigurationImpl setJournalDatasync(boolean enable) {
+      journalDatasync = enable;
+      return this;
+   }
+
+   @Override
    public long getFileDeployerScanPeriod() {
       return fileDeploymentScanPeriod;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 2dccb03..a77b850 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -488,6 +488,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
          }
       }
 
+      config.setJournalDatasync(getBoolean(e, "journal-datasync", config.isJournalDatasync()));
+
       config.setJournalSyncTransactional(getBoolean(e, "journal-sync-transactional", config.isJournalSyncTransactional()));
 
       config.setJournalSyncNonTransactional(getBoolean(e, "journal-sync-non-transactional", config.isJournalSyncNonTransactional()));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 24650e1..c0ef93e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -118,6 +118,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
       }
 
       bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
+      bindingsFF.setDatasync(config.isJournalDatasync());
 
       Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1, 0);
 
@@ -135,6 +136,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
          throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
       }
 
+      journalFF.setDatasync(config.isJournalDatasync());
+
+
       Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO(), 0);
 
       messageJournal = localMessage;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index 149c011..64e496a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
 import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
@@ -150,7 +151,9 @@ public class ActiveMQPacketHandler implements ChannelHandler {
             activeMQPrincipal = connection.getDefaultActiveMQPrincipal();
          }
 
-         ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true);
+         OperationContext sessionOperationContext = server.newOperationContext();
+
+         ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext);
 
          ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel);
          channel.setHandler(handler);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index a266bff..9b5578c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
 import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.remoting.server.RemotingService;
@@ -181,7 +182,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
                                boolean xa,
                                String defaultAddress,
                                SessionCallback callback,
-                               boolean autoCreateQueues) throws Exception;
+                               boolean autoCreateQueues,
+                               OperationContext context) throws Exception;
 
    SecurityStore getSecurityStore();
 
@@ -193,6 +195,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
 
    HierarchicalRepository<AddressSettings> getAddressSettingsRepository();
 
+   OperationContext newOperationContext();
+
    int getConnectionCount();
 
    long getTotalConnectionCount();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 9a0293e..8e86067 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -426,6 +426,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public OperationContext newOperationContext() {
+      return getStorageManager().newContext(getExecutorFactory().getExecutor());
+   }
+
+   @Override
    public final synchronized void start() throws Exception {
       if (state != SERVER_STATE.STOPPED) {
          logger.debug("Server already started!");
@@ -1190,7 +1195,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                                       final boolean xa,
                                       final String defaultAddress,
                                       final SessionCallback callback,
-                                      final boolean autoCreateQueues) throws Exception {
+                                      final boolean autoCreateQueues,
+                                      final OperationContext context) throws Exception {
       String validatedUser = "";
 
       if (securityStore != null) {
@@ -1203,7 +1209,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       checkSessionLimit(validatedUser);
 
-      final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor());
       final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues);
 
       sessions.put(name, session);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 4c3e068..8da84fe 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -46,6 +46,14 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="journal-datasync" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  that means the server will use fdatasync to confirm writes on the disk.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="persistence-enabled" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 46f3958..c1639c7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -359,6 +359,8 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertEquals(1234567, conf.getGlobalMaxSize());
       assertEquals(37, conf.getMaxDiskUsage());
       assertEquals(123, conf.getDiskScanPeriod());
+
+      assertEquals(false, conf.isJournalDatasync());
    }
 
    private void verifyAddresses() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 29119f8..7f01767 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -450,7 +450,7 @@ public abstract class ActiveMQTestBase extends Assert {
     * @throws Exception
     */
    protected ConfigurationImpl createBasicConfig(final int serverID) {
-      ConfigurationImpl configuration = new ConfigurationImpl().setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(getJournalDir(serverID, false)).setBindingsDirectory(getBindingsDir(serverID, false)).setPagingDirectory(getPageDir(serverID, false)).setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD);
+      ConfigurationImpl configuration = new ConfigurationImpl().setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(getJournalDir(serverID, false)).setBindingsDirectory(getBindingsDir(serverID, false)).setPagingDirectory(getPageDir(serverID, false)).setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD).setJournalDatasync(false);
 
       return configuration;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 3bc14bf..87dbd90 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -49,6 +49,7 @@
       <message-expiry-scan-period>10111213</message-expiry-scan-period>
       <message-expiry-thread-priority>8</message-expiry-thread-priority>
       <id-cache-size>127</id-cache-size>
+      <journal-datasync>false</journal-datasync>
       <persist-id-cache>true</persist-id-cache>
       <populate-validated-user>true</populate-validated-user>
       <connection-ttl-check-interval>98765</connection-ttl-check-interval>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/docs/user-manual/en/configuration-index.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md
index c47861b..65ef931 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -62,6 +62,7 @@ Name | Description
 [journal-sync-non-transactional](persistence.md)                                      |  if true wait for non transaction data to be synced to the journal before returning response to client. Default=true
 [journal-sync-transactional](persistence.md)                                          |  if true wait for transaction data to be synchronized to the journal before returning response to client. Default=true
 [journal-type](persistence.md)                                                        |  the type of journal to use. Default=ASYNCIO
+[journal-datasync](persistence.md)                                                        |  It will use fsync on journal operations. Default=true.
 [large-messages-directory](large-messages.md "Configuring the server")          |  the directory to store large messages. Default=data/largemessages
 [management-address](management.md "Configuring Core Management")   |  the name of the management address to send management messages to. It is prefixed with "jms.queue" so that JMS clients can send messages to it. Default=jms.queue.activemq.management
 [management-notification-address](management.md "Configuring The Core Management Notification Address") |  the name of the address that consumers bind to receive management notifications. Default=activemq.notifications

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bcbbc868/docs/user-manual/en/persistence.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/persistence.md b/docs/user-manual/en/persistence.md
index cee06f4..6f9c481 100644
--- a/docs/user-manual/en/persistence.md
+++ b/docs/user-manual/en/persistence.md
@@ -298,6 +298,10 @@ The message journal is configured using the following attributes in
     data files on the journal
 
     The default for this parameter is `30`
+    
+-   `journal-datasync` (default: true)
+    
+    This will disable the use of fdatasync on journal writes.
 
 ### An important note on disabling disk write cache.
 


Mime
View raw message