activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [06/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:41:36 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallback.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallback.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallback.java
new file mode 100644
index 0000000..2cd5efe
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallback.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import org.apache.activemq6.core.journal.RecordInfo;
+
+/**
+ * A JournalReader
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface JournalReaderCallback
+{
+   void onReadAddRecord(RecordInfo info) throws Exception;
+
+   /**
+    * @param recordInfo
+    * @throws Exception
+    */
+   void onReadUpdateRecord(RecordInfo recordInfo) throws Exception;
+
+   /**
+    * @param recordID
+    */
+   void onReadDeleteRecord(long recordID) throws Exception;
+
+   /**
+    * @param transactionID
+    * @param recordInfo
+    * @throws Exception
+    */
+   void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+   /**
+    * @param transactionID
+    * @param recordInfo
+    * @throws Exception
+    */
+   void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+   /**
+    * @param transactionID
+    * @param recordInfo
+    */
+   void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+   /**
+    * @param transactionID
+    * @param extraData
+    * @param numberOfRecords
+    */
+   void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception;
+
+   /**
+    * @param transactionID
+    * @param numberOfRecords
+    */
+   void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception;
+
+   /**
+    * @param transactionID
+    */
+   void onReadRollbackRecord(long transactionID) throws Exception;
+
+   void markAsDataFile(JournalFile file);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallbackAbstract.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallbackAbstract.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallbackAbstract.java
new file mode 100644
index 0000000..fdd50bd
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalReaderCallbackAbstract.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import org.apache.activemq6.core.journal.RecordInfo;
+
+/**
+ * A JournalReaderCallbackAbstract
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalReaderCallbackAbstract implements JournalReaderCallback
+{
+
+   public void markAsDataFile(final JournalFile file)
+   {
+   }
+
+   public void onReadAddRecord(final RecordInfo info) throws Exception
+   {
+   }
+
+   public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+   {
+   }
+
+   public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
+   {
+   }
+
+   public void onReadDeleteRecord(final long recordID) throws Exception
+   {
+   }
+
+   public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+   {
+   }
+
+   public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
+   {
+   }
+
+   public void onReadRollbackRecord(final long transactionID) throws Exception
+   {
+   }
+
+   public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
+   {
+   }
+
+   public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+   {
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecord.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecord.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecord.java
new file mode 100644
index 0000000..414f01d
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecord.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq6.api.core.Pair;
+
+/**
+ * This holds the relationship a record has with other files in regard to reference counting.
+ * Note: This class used to be called PosFiles
+ *
+ * Used on the ref-count for reclaiming
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ * */
+public class JournalRecord
+{
+   private final JournalFile addFile;
+
+   private final int size;
+
+   private List<Pair<JournalFile, Integer>> updateFiles;
+
+   public JournalRecord(final JournalFile addFile, final int size)
+   {
+      this.addFile = addFile;
+
+      this.size = size;
+
+      addFile.incPosCount();
+
+      addFile.addSize(size);
+   }
+
+   void addUpdateFile(final JournalFile updateFile, final int size)
+   {
+      if (updateFiles == null)
+      {
+         updateFiles = new ArrayList<Pair<JournalFile, Integer>>();
+      }
+
+      updateFiles.add(new Pair<JournalFile, Integer>(updateFile, size));
+
+      updateFile.incPosCount();
+
+      updateFile.addSize(size);
+   }
+
+   void delete(final JournalFile file)
+   {
+      file.incNegCount(addFile);
+      addFile.decSize(size);
+
+      if (updateFiles != null)
+      {
+         for (Pair<JournalFile, Integer> updFile : updateFiles)
+         {
+            file.incNegCount(updFile.getA());
+            updFile.getA().decSize(updFile.getB());
+         }
+      }
+   }
+
+   @Override
+   public String toString()
+   {
+      StringBuilder buffer = new StringBuilder();
+      buffer.append("JournalRecord(add=" + addFile.getFile().getFileName());
+
+      if (updateFiles != null)
+      {
+
+         for (Pair<JournalFile, Integer> update : updateFiles)
+         {
+            buffer.append(", update=" + update.getA().getFile().getFileName());
+         }
+
+      }
+
+      buffer.append(")");
+
+      return buffer.toString();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecordProvider.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecordProvider.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecordProvider.java
new file mode 100644
index 0000000..e72fe49
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalRecordProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.util.Map;
+
+/**
+ * This is an interface used only internally.
+ *
+ * During a TX.commit, the JournalTransaction needs to get a valid list of records from either the JournalImpl or JournalCompactor.
+ *
+ * when a commit is read, the JournalTransaction will inquire the JournalCompactor about the existent records
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface JournalRecordProvider
+{
+   JournalCompactor getCompactor();
+
+   Map<Long, JournalRecord> getRecords();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalTransaction.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalTransaction.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalTransaction.java
new file mode 100644
index 0000000..a5dadbc
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalTransaction.java
@@ -0,0 +1,456 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq6.api.core.HornetQExceptionType;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalInternalRecord;
+
+/**
+ * A JournalTransaction
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalTransaction
+{
+
+   private JournalRecordProvider journal;
+
+   private List<JournalUpdate> pos;
+
+   private List<JournalUpdate> neg;
+
+   private final long id;
+
+   // All the files this transaction is touching on.
+   // We can't have those files being reclaimed if there is a pending transaction
+   private Set<JournalFile> pendingFiles;
+
+   private TransactionCallback currentCallback;
+
+   private boolean compacting = false;
+
+   private Map<JournalFile, TransactionCallback> callbackList;
+
+   private JournalFile lastFile = null;
+
+   private final AtomicInteger counter = new AtomicInteger();
+
+   public JournalTransaction(final long id, final JournalRecordProvider journal)
+   {
+      this.id = id;
+      this.journal = journal;
+   }
+
+   public void replaceRecordProvider(final JournalRecordProvider provider)
+   {
+      journal = provider;
+   }
+
+   /**
+    * @return the id
+    */
+   public long getId()
+   {
+      return id;
+   }
+
+   public int getCounter(final JournalFile file)
+   {
+      return internalgetCounter(file).intValue();
+   }
+
+   public void incCounter(final JournalFile file)
+   {
+      internalgetCounter(file).incrementAndGet();
+   }
+
+   public long[] getPositiveArray()
+   {
+      if (pos == null)
+      {
+         return new long[0];
+      }
+      else
+      {
+         int i = 0;
+         long[] ids = new long[pos.size()];
+         for (JournalUpdate el : pos)
+         {
+            ids[i++] = el.getId();
+         }
+         return ids;
+      }
+   }
+
+   public void setCompacting()
+   {
+      compacting = true;
+
+      // Everything is cleared on the transaction...
+      // since we are compacting, everything is at the compactor's level
+      clear();
+   }
+
+   /** This is used to merge transactions from compacting */
+   public void merge(final JournalTransaction other)
+   {
+      if (other.pos != null)
+      {
+         if (pos == null)
+         {
+            pos = new ArrayList<JournalUpdate>();
+         }
+
+         pos.addAll(other.pos);
+      }
+
+      if (other.neg != null)
+      {
+         if (neg == null)
+         {
+            neg = new ArrayList<JournalUpdate>();
+         }
+
+         neg.addAll(other.neg);
+      }
+
+      if (other.pendingFiles != null)
+      {
+         if (pendingFiles == null)
+         {
+            pendingFiles = new HashSet<JournalFile>();
+         }
+
+         pendingFiles.addAll(other.pendingFiles);
+      }
+
+      compacting = false;
+   }
+
+   /**
+    *
+    */
+   public void clear()
+   {
+      // / Compacting is recreating all the previous files and everything
+      // / so we just clear the list of previous files, previous pos and previous adds
+      // / The transaction may be working at the top from now
+
+      if (pendingFiles != null)
+      {
+         pendingFiles.clear();
+      }
+
+      if (callbackList != null)
+      {
+         callbackList.clear();
+      }
+
+      if (pos != null)
+      {
+         pos.clear();
+      }
+
+      if (neg != null)
+      {
+         neg.clear();
+      }
+
+      counter.set(0);
+
+      lastFile = null;
+
+      currentCallback = null;
+   }
+
+   /**
+    * @param currentFile
+    * @param data
+    */
+   public void fillNumberOfRecords(final JournalFile currentFile, final JournalInternalRecord data)
+   {
+      data.setNumberOfRecords(getCounter(currentFile));
+   }
+
+   public TransactionCallback getCallback(final JournalFile file) throws Exception
+   {
+      if (callbackList == null)
+      {
+         callbackList = new HashMap<JournalFile, TransactionCallback>();
+      }
+
+      currentCallback = callbackList.get(file);
+
+      if (currentCallback == null)
+      {
+         currentCallback = new TransactionCallback();
+         callbackList.put(file, currentCallback);
+      }
+
+      if (currentCallback.getErrorMessage() != null)
+      {
+         throw HornetQExceptionType.createException(currentCallback.getErrorCode(), currentCallback.getErrorMessage());
+      }
+
+      currentCallback.countUp();
+
+      return currentCallback;
+   }
+
+   public void addPositive(final JournalFile file, final long id, final int size)
+   {
+      incCounter(file);
+
+      addFile(file);
+
+      if (pos == null)
+      {
+         pos = new ArrayList<JournalUpdate>();
+      }
+
+      pos.add(new JournalUpdate(file, id, size));
+   }
+
+   public void addNegative(final JournalFile file, final long id)
+   {
+      incCounter(file);
+
+      addFile(file);
+
+      if (neg == null)
+      {
+         neg = new ArrayList<JournalUpdate>();
+      }
+
+      neg.add(new JournalUpdate(file, id, 0));
+   }
+
+   /**
+    * The caller of this method needs to guarantee appendLock.lock at the journal. (unless this is being called from load what is a single thread process).
+    * */
+   public void commit(final JournalFile file)
+   {
+      JournalCompactor compactor = journal.getCompactor();
+
+      if (compacting)
+      {
+         compactor.addCommandCommit(this, file);
+      }
+      else
+      {
+
+         if (pos != null)
+         {
+            for (JournalUpdate trUpdate : pos)
+            {
+               JournalRecord posFiles = journal.getRecords().get(trUpdate.id);
+
+               if (compactor != null && compactor.lookupRecord(trUpdate.id))
+               {
+                  // This is a case where the transaction was opened after compacting was started,
+                  // but the commit arrived while compacting was working
+                  // We need to cache the counter update, so compacting will take the correct files when it is done
+                  compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size);
+               }
+               else if (posFiles == null)
+               {
+                  posFiles = new JournalRecord(trUpdate.file, trUpdate.size);
+
+                  journal.getRecords().put(trUpdate.id, posFiles);
+               }
+               else
+               {
+                  posFiles.addUpdateFile(trUpdate.file, trUpdate.size);
+               }
+            }
+         }
+
+         if (neg != null)
+         {
+            for (JournalUpdate trDelete : neg)
+            {
+               if (compactor != null)
+               {
+                  compactor.addCommandDelete(trDelete.id, trDelete.file);
+               }
+               else
+               {
+                  JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
+
+                  if (posFiles != null)
+                  {
+                     posFiles.delete(trDelete.file);
+                  }
+               }
+            }
+         }
+
+         // Now add negs for the pos we added in each file in which there were
+         // transactional operations
+
+         for (JournalFile jf : pendingFiles)
+         {
+            file.incNegCount(jf);
+         }
+      }
+   }
+
+   public void waitCallbacks() throws InterruptedException
+   {
+      if (callbackList != null)
+      {
+         for (TransactionCallback callback : callbackList.values())
+         {
+            callback.waitCompletion();
+         }
+      }
+   }
+
+   /** Wait completion at the latest file only */
+   public void waitCompletion() throws Exception
+   {
+      if (currentCallback != null)
+      {
+         currentCallback.waitCompletion();
+      }
+   }
+
+   /**
+    * The caller of this method needs to guarantee appendLock.lock before calling this method if being used outside of the lock context.
+    * or else potFilesMap could be affected
+    * */
+   public void rollback(final JournalFile file)
+   {
+      JournalCompactor compactor = journal.getCompactor();
+
+      if (compacting && compactor != null)
+      {
+         compactor.addCommandRollback(this, file);
+      }
+      else
+      {
+         // Now add negs for the pos we added in each file in which there were
+         // transactional operations
+         // Note that we do this on rollback as we do on commit, since we need
+         // to ensure the file containing
+         // the rollback record doesn't get deleted before the files with the
+         // transactional operations are deleted
+         // Otherwise we may run into problems especially with XA where we are
+         // just left with a prepare when the tx
+         // has actually been rolled back
+
+         for (JournalFile jf : pendingFiles)
+         {
+            file.incNegCount(jf);
+         }
+      }
+   }
+
+   /**
+    * The caller of this method needs to guarantee appendLock.lock before calling this method if being used outside of the lock context.
+    * or else potFilesMap could be affected
+    * */
+   public void prepare(final JournalFile file)
+   {
+      // We don't want the prepare record getting deleted before time
+
+      addFile(file);
+   }
+
+   /** Used by load, when the transaction was not loaded correctly */
+   public void forget()
+   {
+      // The transaction was not committed or rolled back in the file, so we
+      // reverse any pos counts we added
+      for (JournalFile jf : pendingFiles)
+      {
+         jf.decPosCount();
+      }
+
+   }
+
+   @Override
+   public String toString()
+   {
+      return "JournalTransaction(" + id + ")";
+   }
+
+   private AtomicInteger internalgetCounter(final JournalFile file)
+   {
+      if (lastFile != file)
+
+      {
+         lastFile = file;
+         counter.set(0);
+      }
+      return counter;
+   }
+
+   private void addFile(final JournalFile file)
+   {
+      if (pendingFiles == null)
+      {
+         pendingFiles = new HashSet<JournalFile>();
+      }
+
+      if (!pendingFiles.contains(file))
+      {
+         pendingFiles.add(file);
+
+         // We add a pos for the transaction itself in the file - this
+         // prevents any transactional operations
+         // being deleted before a commit or rollback is written
+         file.incPosCount();
+      }
+   }
+
+   private static class JournalUpdate
+   {
+      private final JournalFile file;
+
+      long id;
+
+      int size;
+
+      /**
+       * @param file
+       * @param id
+       * @param size
+       */
+      private JournalUpdate(final JournalFile file, final long id, final int size)
+      {
+         super();
+         this.file = file;
+         this.id = id;
+         this.size = size;
+      }
+
+      /**
+       * @return the id
+       */
+      public long getId()
+      {
+         return id;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFile.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFile.java
new file mode 100644
index 0000000..97a70dc
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFile.java
@@ -0,0 +1,415 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQExceptionType;
+import org.apache.activemq6.api.core.HornetQIOErrorException;
+import org.apache.activemq6.api.core.HornetQIllegalStateException;
+import org.apache.activemq6.core.journal.IOAsyncTask;
+import org.apache.activemq6.core.journal.SequentialFile;
+import org.apache.activemq6.core.journal.SequentialFileFactory;
+import org.apache.activemq6.journal.HornetQJournalBundle;
+import org.apache.activemq6.journal.HornetQJournalLogger;
+
+/**
+ * A NIOSequentialFile
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ */
+public final class NIOSequentialFile extends AbstractSequentialFile
+{
+   private FileChannel channel;
+
+   private RandomAccessFile rfile;
+
+   /**
+    * The write semaphore here is only used when writing asynchronously
+    */
+   private Semaphore maxIOSemaphore;
+
+   private final int defaultMaxIO;
+
+   private int maxIO;
+
+   public NIOSequentialFile(final SequentialFileFactory factory,
+                            final String directory,
+                            final String fileName,
+                            final int maxIO,
+                            final Executor writerExecutor)
+   {
+      super(directory, new File(directory + "/" + fileName), factory, writerExecutor);
+      defaultMaxIO = maxIO;
+   }
+
+   public NIOSequentialFile(final SequentialFileFactory factory,
+                            final File file,
+                            final int maxIO,
+                            final Executor writerExecutor)
+   {
+      super(file.getParent(), new File(file.getPath()), factory, writerExecutor);
+      defaultMaxIO = maxIO;
+   }
+
+   public int getAlignment()
+   {
+      return 1;
+   }
+
+   public int calculateBlockStart(final int position)
+   {
+      return position;
+   }
+
+   public synchronized boolean isOpen()
+   {
+      return channel != null;
+   }
+
+   /**
+    * this.maxIO represents the default maxIO.
+    * Some operations while initializing files on the journal may require a different maxIO
+    */
+   public synchronized void open() throws IOException
+   {
+      open(defaultMaxIO, true);
+   }
+
+   public void open(final int maxIO, final boolean useExecutor) throws IOException
+   {
+      try
+      {
+         rfile = new RandomAccessFile(getFile(), "rw");
+
+         channel = rfile.getChannel();
+
+         fileSize = channel.size();
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         throw e;
+      }
+
+      if (writerExecutor != null && useExecutor)
+      {
+         maxIOSemaphore = new Semaphore(maxIO);
+         this.maxIO = maxIO;
+      }
+   }
+
+   public void fill(final int position, final int size, final byte fillCharacter) throws IOException
+   {
+      ByteBuffer bb = ByteBuffer.allocate(size);
+
+      for (int i = 0; i < size; i++)
+      {
+         bb.put(fillCharacter);
+      }
+
+      bb.flip();
+
+      try
+      {
+         channel.position(position);
+         channel.write(bb);
+         channel.force(false);
+         channel.position(0);
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         throw e;
+      }
+
+      fileSize = channel.size();
+   }
+
+   public synchronized void waitForClose() throws InterruptedException
+   {
+      while (isOpen())
+      {
+         wait();
+      }
+   }
+
+   @Override
+   public synchronized void close() throws IOException, InterruptedException, HornetQException
+   {
+      super.close();
+
+      if (maxIOSemaphore != null)
+      {
+         while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
+         {
+            HornetQJournalLogger.LOGGER.errorClosingFile(getFileName());
+         }
+      }
+
+      maxIOSemaphore = null;
+      try
+      {
+         if (channel != null)
+         {
+            channel.close();
+         }
+
+         if (rfile != null)
+         {
+            rfile.close();
+         }
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         throw e;
+      }
+      channel = null;
+
+      rfile = null;
+
+      notifyAll();
+   }
+
+   public int read(final ByteBuffer bytes) throws Exception
+   {
+      return read(bytes, null);
+   }
+
+   public synchronized int read(final ByteBuffer bytes, final IOAsyncTask callback) throws IOException,
+      HornetQIllegalStateException
+   {
+      try
+      {
+         if (channel == null)
+         {
+            throw new HornetQIllegalStateException("File " + this.getFileName() + " has a null channel");
+         }
+         int bytesRead = channel.read(bytes);
+
+         if (callback != null)
+         {
+            callback.done();
+         }
+
+         bytes.flip();
+
+         return bytesRead;
+      }
+      catch (IOException e)
+      {
+         if (callback != null)
+         {
+            callback.onError(HornetQExceptionType.IO_ERROR.getCode(), e.getLocalizedMessage());
+         }
+
+         factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+
+         throw e;
+      }
+   }
+
+   public void sync() throws IOException
+   {
+      if (channel != null)
+      {
+         try
+         {
+            channel.force(false);
+         }
+         catch (IOException e)
+         {
+            factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+            throw e;
+         }
+      }
+   }
+
+   public long size() throws IOException
+   {
+      if (channel == null)
+      {
+         return getFile().length();
+      }
+
+      try
+      {
+         return channel.size();
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         throw e;
+      }
+   }
+
+   @Override
+   public void position(final long pos) throws IOException
+   {
+      try
+      {
+         super.position(pos);
+         channel.position(pos);
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         throw e;
+      }
+   }
+
+   @Override
+   public String toString()
+   {
+      return "NIOSequentialFile " + getFile();
+   }
+
+   public SequentialFile cloneFile()
+   {
+      return new NIOSequentialFile(factory, getFile(), maxIO, writerExecutor);
+   }
+
+   public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback)
+   {
+      if (callback == null)
+      {
+         throw new NullPointerException("callback parameter need to be set");
+      }
+
+      try
+      {
+         internalWrite(bytes, sync, callback);
+      }
+      catch (Exception e)
+      {
+         callback.onError(HornetQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage());
+      }
+   }
+
+   public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
+   {
+      internalWrite(bytes, sync, null);
+   }
+
+   public void writeInternal(final ByteBuffer bytes) throws Exception
+   {
+      internalWrite(bytes, true, null);
+   }
+
+   @Override
+   protected ByteBuffer newBuffer(int size, final int limit)
+   {
+      // For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO
+
+      size = limit;
+
+      return super.newBuffer(size, limit);
+   }
+
+   private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException, HornetQIOErrorException, InterruptedException
+   {
+      if (!isOpen())
+      {
+         if (callback != null)
+         {
+            callback.onError(HornetQExceptionType.IO_ERROR.getCode(), "File not opened");
+         }
+         else
+         {
+            throw HornetQJournalBundle.BUNDLE.fileNotOpened();
+         }
+         return;
+      }
+
+      position.addAndGet(bytes.limit());
+
+      if (maxIOSemaphore == null || callback == null)
+      {
+         // if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous
+         try
+         {
+            doInternalWrite(bytes, sync, callback);
+         }
+         catch (IOException e)
+         {
+            factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         }
+      }
+      else
+      {
+         // This is a flow control on writing, just like maxAIO on libaio
+         maxIOSemaphore.acquire();
+
+         writerExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               try
+               {
+                  try
+                  {
+                     doInternalWrite(bytes, sync, callback);
+                  }
+                  catch (IOException e)
+                  {
+                     HornetQJournalLogger.LOGGER.errorSubmittingWrite(e);
+                     factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this);
+                     callback.onError(HornetQExceptionType.IO_ERROR.getCode(), e.getMessage());
+                  }
+                  catch (Throwable e)
+                  {
+                     HornetQJournalLogger.LOGGER.errorSubmittingWrite(e);
+                     callback.onError(HornetQExceptionType.IO_ERROR.getCode(), e.getMessage());
+                  }
+               }
+               finally
+               {
+                  maxIOSemaphore.release();
+               }
+            }
+         });
+      }
+   }
+
+   /**
+    * @param bytes
+    * @param sync
+    * @param callback
+    * @throws IOException
+    * @throws Exception
+    */
+   private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException
+   {
+      channel.write(bytes);
+
+      if (sync)
+      {
+         sync();
+      }
+
+      if (callback != null)
+      {
+         callback.done();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFileFactory.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFileFactory.java
new file mode 100644
index 0000000..2137477
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/NIOSequentialFileFactory.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq6.core.journal.IOCriticalErrorListener;
+import org.apache.activemq6.core.journal.SequentialFile;
+
+/**
+ *
+ * A NIOSequentialFileFactory
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ */
+public class NIOSequentialFileFactory extends AbstractSequentialFileFactory
+{
+   public NIOSequentialFileFactory(final String journalDir)
+   {
+      this(journalDir, null);
+   }
+
+   public NIOSequentialFileFactory(final String journalDir, final IOCriticalErrorListener listener)
+   {
+      this(journalDir,
+           false,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
+           false,
+           listener);
+   }
+
+   public NIOSequentialFileFactory(final String journalDir, final boolean buffered)
+   {
+      this(journalDir, buffered, null);
+   }
+
+   public NIOSequentialFileFactory(final String journalDir,
+                                   final boolean buffered,
+                                   final IOCriticalErrorListener listener)
+   {
+      this(journalDir,
+           buffered,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
+           JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
+           false,
+           listener);
+   }
+
+   public NIOSequentialFileFactory(final String journalDir,
+                                   final boolean buffered,
+                                   final int bufferSize,
+                                   final int bufferTimeout,
+                                   final boolean logRates)
+   {
+      this(journalDir, buffered, bufferSize, bufferTimeout, logRates, null);
+   }
+
+   public NIOSequentialFileFactory(final String journalDir,
+                                   final boolean buffered,
+                                   final int bufferSize,
+                                   final int bufferTimeout,
+                                   final boolean logRates,
+                                   final IOCriticalErrorListener listener)
+   {
+      super(journalDir, buffered, bufferSize, bufferTimeout, logRates, listener);
+   }
+
+   public SequentialFile createSequentialFile(final String fileName, int maxIO)
+   {
+      if (maxIO < 1)
+      {
+         // A single threaded IO
+         maxIO = 1;
+      }
+
+      return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
+   }
+
+   public boolean isSupportsCallbacks()
+   {
+      return timedBuffer != null;
+   }
+
+
+   public ByteBuffer allocateDirectBuffer(final int size)
+   {
+      // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
+      ByteBuffer buffer2 = null;
+      try
+      {
+         buffer2 = ByteBuffer.allocateDirect(size);
+      }
+      catch (OutOfMemoryError error)
+      {
+         // This is a workaround for the way the JDK will deal with native buffers.
+         // the main portion is outside of the VM heap
+         // and the JDK will not have any reference about it to take GC into account
+         // so we force a GC and try again.
+         WeakReference<Object> obj = new WeakReference<Object>(new Object());
+         try
+         {
+            long timeout = System.currentTimeMillis() + 5000;
+            while (System.currentTimeMillis() > timeout && obj.get() != null)
+            {
+               System.gc();
+               Thread.sleep(100);
+            }
+         }
+         catch (InterruptedException e)
+         {
+         }
+
+         buffer2 = ByteBuffer.allocateDirect(size);
+
+      }
+      return buffer2;
+   }
+
+   public void releaseDirectBuffer(ByteBuffer buffer)
+   {
+      // nothing we can do on this case. we can just have good faith on GC
+   }
+
+   public ByteBuffer newBuffer(final int size)
+   {
+      return ByteBuffer.allocate(size);
+   }
+
+   public void clearBuffer(final ByteBuffer buffer)
+   {
+      final int limit = buffer.limit();
+      buffer.rewind();
+
+      for (int i = 0; i < limit; i++)
+      {
+         buffer.put((byte)0);
+      }
+
+      buffer.rewind();
+   }
+
+   public ByteBuffer wrapBuffer(final byte[] bytes)
+   {
+      return ByteBuffer.wrap(bytes);
+   }
+
+   public int getAlignment()
+   {
+      return 1;
+   }
+
+   public int calculateBlockSize(final int bytes)
+   {
+      return bytes;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/Reclaimer.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/Reclaimer.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/Reclaimer.java
new file mode 100644
index 0000000..c5d1800
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/Reclaimer.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+
+import org.apache.activemq6.journal.HornetQJournalLogger;
+
+/**
+ *
+ * <p>The journal consists of an ordered list of journal files Fn where 0 <= n <= N</p>
+ *
+ * <p>A journal file can contain either positives (pos) or negatives (neg)</p>
+ *
+ * <p>(Positives correspond either to adds or updates, and negatives correspond to deletes).</p>
+ *
+ * <p>A file Fn can be deleted if, and only if the following criteria are satisified</p>
+ *
+ * <p>1) All pos in a file Fn, must have corresponding neg in any file Fm where m >= n.</p>
+ *
+ * <p>2) All pos that correspond to any neg in file Fn, must all live in any file Fm where 0 <= m <= n
+ * which are also marked for deletion in the same pass of the algorithm.</p>
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ */
+public class Reclaimer
+{
+   private static boolean trace = HornetQJournalLogger.LOGGER.isTraceEnabled();
+
+   private static void trace(final String message)
+   {
+      HornetQJournalLogger.LOGGER.trace(message);
+   }
+
+   public void scan(final JournalFile[] files)
+   {
+      for (int i = 0; i < files.length; i++)
+      {
+         // First we evaluate criterion 1)
+
+         JournalFile currentFile = files[i];
+
+         int posCount = currentFile.getPosCount();
+
+         int totNeg = 0;
+
+         if (Reclaimer.trace)
+         {
+            Reclaimer.trace("posCount on " + currentFile + " = " + posCount);
+         }
+
+         for (int j = i; j < files.length; j++)
+         {
+            if (Reclaimer.trace)
+            {
+               if (files[j].getNegCount(currentFile) != 0)
+               {
+                  Reclaimer.trace("Negative from " + files[j] +
+                                  " into " +
+                                  currentFile +
+                                  " = " +
+                                  files[j].getNegCount(currentFile));
+               }
+            }
+
+            totNeg += files[j].getNegCount(currentFile);
+         }
+
+         currentFile.setCanReclaim(true);
+
+         if (posCount <= totNeg)
+         {
+            // Now we evaluate criterion 2)
+
+            for (int j = 0; j <= i; j++)
+            {
+               JournalFile file = files[j];
+
+               int negCount = currentFile.getNegCount(file);
+
+               if (negCount != 0)
+               {
+                  if (file.isCanReclaim())
+                  {
+                     // Ok
+                  }
+                  else
+                  {
+                     if (Reclaimer.trace)
+                     {
+                        Reclaimer.trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
+                     }
+
+                     currentFile.setCanReclaim(false);
+
+                     break;
+                  }
+               }
+            }
+         }
+         else
+         {
+            currentFile.setCanReclaim(false);
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SimpleWaitIOCallback.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SimpleWaitIOCallback.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SimpleWaitIOCallback.java
new file mode 100644
index 0000000..26bfbe0
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SimpleWaitIOCallback.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQExceptionType;
+import org.apache.activemq6.journal.HornetQJournalLogger;
+
+/**
+ * A SimpleWaitIOCallback
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public final class SimpleWaitIOCallback extends SyncIOCompletion
+{
+   private final CountDownLatch latch = new CountDownLatch(1);
+
+   private volatile String errorMessage;
+
+   private volatile int errorCode = 0;
+
+   @Override
+   public String toString()
+   {
+      return SimpleWaitIOCallback.class.getName();
+   }
+
+   public void done()
+   {
+      latch.countDown();
+   }
+
+   public void onError(final int errorCode1, final String errorMessage1)
+   {
+      this.errorCode = errorCode1;
+
+      this.errorMessage = errorMessage1;
+
+      HornetQJournalLogger.LOGGER.errorOnIOCallback(errorMessage1);
+
+      latch.countDown();
+   }
+
+   @Override
+   public void waitCompletion() throws InterruptedException, HornetQException
+   {
+      while (true)
+      {
+         if (latch.await(2, TimeUnit.SECONDS))
+            break;
+      }
+
+      if (errorMessage != null)
+      {
+         throw HornetQExceptionType.createException(errorCode, errorMessage);
+      }
+
+      return;
+   }
+
+   public boolean waitCompletion(final long timeout) throws InterruptedException, HornetQException
+   {
+      boolean retValue = latch.await(timeout, TimeUnit.MILLISECONDS);
+
+      if (errorMessage != null)
+      {
+         throw HornetQExceptionType.createException(errorCode, errorMessage);
+      }
+
+      return retValue;
+   }
+
+   @Override
+   public void storeLineUp()
+   {
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncIOCompletion.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncIOCompletion.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncIOCompletion.java
new file mode 100644
index 0000000..2c344a1
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncIOCompletion.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import org.apache.activemq6.core.journal.IOCompletion;
+
+/**
+ * Internal class used to manage explicit syncs on the Journal through callbacks.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class SyncIOCompletion implements IOCompletion
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public abstract void waitCompletion() throws Exception;
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncSpeedTest.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncSpeedTest.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncSpeedTest.java
new file mode 100644
index 0000000..b0628f2
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/SyncSpeedTest.java
@@ -0,0 +1,354 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+
+import org.apache.activemq6.core.journal.IOAsyncTask;
+import org.apache.activemq6.core.journal.SequentialFile;
+import org.apache.activemq6.core.journal.SequentialFileFactory;
+import org.apache.activemq6.journal.HornetQJournalLogger;
+
+/**
+ * A SyncSpeedTest
+ *
+ * This class just provides some diagnostics on how fast your disk can sync
+ * Useful when determining performance issues
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a> fox
+ *
+ *
+ */
+public class SyncSpeedTest
+{
+   public static void main(final String[] args)
+   {
+      try
+      {
+         new SyncSpeedTest().testScaleAIO();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   protected SequentialFileFactory fileFactory;
+
+   public boolean AIO = true;
+
+   protected void setupFactory()
+   {
+      if (AIO)
+      {
+         fileFactory = new AIOSequentialFileFactory(".", 0, 0, false, null);
+      }
+      else
+      {
+         fileFactory = new NIOSequentialFileFactory(".", false, 0, 0, false, null);
+      }
+   }
+
+   protected SequentialFile createSequentialFile(final String fileName)
+   {
+      if (AIO)
+      {
+         return new AIOSequentialFile(fileFactory,
+                                      0,
+                                      0,
+                                      ".",
+                                      fileName,
+                                      100000,
+                                      null,
+                                      null,
+                                      Executors.newSingleThreadExecutor());
+      }
+      else
+      {
+         return new NIOSequentialFile(fileFactory, new File(fileName), 1000, null);
+      }
+   }
+
+   public void run2() throws Exception
+   {
+      setupFactory();
+
+      int recordSize = 128 * 1024;
+
+      while (true)
+      {
+         System.out.println("** record size is " + recordSize);
+
+         int warmup = 500;
+
+         int its = 500;
+
+         int fileSize = (its + warmup) * recordSize;
+
+         SequentialFile file = createSequentialFile("sync-speed-test.dat");
+
+         if (file.exists())
+         {
+            file.delete();
+         }
+
+         file.open();
+
+         file.fill(0, fileSize, (byte)'X');
+
+         if (!AIO)
+         {
+            file.sync();
+         }
+
+         ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h');
+
+         long start = 0;
+
+         for (int i = 0; i < its + warmup; i++)
+         {
+            if (i == warmup)
+            {
+               start = System.currentTimeMillis();
+            }
+
+            bb1.rewind();
+
+            file.writeDirect(bb1, true);
+         }
+
+         long end = System.currentTimeMillis();
+
+         double rate = 1000 * (double)its / (end - start);
+
+         double throughput = recordSize * rate;
+
+         System.out.println("Rate of " + rate + " syncs per sec");
+         System.out.println("Throughput " + throughput + " bytes per sec");
+         System.out.println("*************");
+
+         recordSize *= 2;
+      }
+   }
+
+   public void run() throws Exception
+   {
+      int recordSize = 256;
+
+      while (true)
+      {
+         System.out.println("** record size is " + recordSize);
+
+         int warmup = 500;
+
+         int its = 500;
+
+         int fileSize = (its + warmup) * recordSize;
+
+         File file = new File("sync-speed-test.dat");
+
+         if (file.exists())
+         {
+            if (!file.delete())
+            {
+               HornetQJournalLogger.LOGGER.errorDeletingFile(file);
+            }
+         }
+
+         boolean created = file.createNewFile();
+         if (!created)
+            throw new IOException("could not create file " + file);
+
+         RandomAccessFile rfile = new RandomAccessFile(file, "rw");
+
+         FileChannel channel = rfile.getChannel();
+
+         ByteBuffer bb = generateBuffer(fileSize, (byte)'x');
+
+         write(bb, channel, fileSize);
+
+         channel.force(true);
+
+         channel.position(0);
+
+         ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h');
+
+         long start = 0;
+
+         for (int i = 0; i < its + warmup; i++)
+         {
+            if (i == warmup)
+            {
+               start = System.currentTimeMillis();
+            }
+
+            bb1.flip();
+            channel.write(bb1);
+            channel.force(false);
+         }
+
+         long end = System.currentTimeMillis();
+
+         double rate = 1000 * (double)its / (end - start);
+
+         double throughput = recordSize * rate;
+
+         System.out.println("Rate of " + rate + " syncs per sec");
+         System.out.println("Throughput " + throughput + " bytes per sec");
+
+         recordSize *= 2;
+      }
+   }
+
+   public void testScaleAIO() throws Exception
+   {
+      setupFactory();
+
+      final int recordSize = 1024;
+
+      System.out.println("** record size is " + recordSize);
+
+      final int its = 10;
+
+      for (int numThreads = 1; numThreads <= 10; numThreads++)
+      {
+
+         int fileSize = its * recordSize * numThreads;
+
+         final SequentialFile file = createSequentialFile("sync-speed-test.dat");
+
+         if (file.exists())
+         {
+            file.delete();
+         }
+
+         file.open();
+
+         file.fill(0, fileSize, (byte)'X');
+
+         if (!AIO)
+         {
+            file.sync();
+         }
+
+         final CountDownLatch latch = new CountDownLatch(its * numThreads);
+
+         class MyIOAsyncTask implements IOAsyncTask
+         {
+            public void done()
+            {
+               latch.countDown();
+            }
+
+            public void onError(final int errorCode, final String errorMessage)
+            {
+
+            }
+         }
+
+         final MyIOAsyncTask task = new MyIOAsyncTask();
+
+         class MyRunner implements Runnable
+         {
+            private final ByteBuffer bb1;
+
+            MyRunner()
+            {
+               bb1 = generateBuffer(recordSize, (byte)'h');
+            }
+
+            public void run()
+            {
+               for (int i = 0; i < its; i++)
+               {
+                  bb1.rewind();
+
+                  file.writeDirect(bb1, true, task);
+                  // try
+                  // {
+                  // file.writeDirect(bb1, true);
+                  // }
+                  // catch (Exception e)
+                  // {
+                  // e.printStackTrace();
+                  // }
+               }
+            }
+         }
+
+         Set<Thread> threads = new HashSet<Thread>();
+
+         for (int i = 0; i < numThreads; i++)
+         {
+            MyRunner runner = new MyRunner();
+
+            Thread t = new Thread(runner);
+
+            threads.add(t);
+         }
+
+         long start = System.currentTimeMillis();
+
+         for (Thread t : threads)
+         {
+            HornetQJournalLogger.LOGGER.startingThread();
+            t.start();
+         }
+
+         for (Thread t : threads)
+         {
+            t.join();
+         }
+
+         latch.await();
+
+         long end = System.currentTimeMillis();
+
+         double rate = 1000 * (double)its * numThreads / (end - start);
+
+         double throughput = recordSize * rate;
+
+         System.out.println("For " + numThreads + " threads:");
+         System.out.println("Rate of " + rate + " records per sec");
+         System.out.println("Throughput " + throughput + " bytes per sec");
+         System.out.println("*************");
+      }
+   }
+
+   private void write(final ByteBuffer buffer, final FileChannel channel, final int size) throws Exception
+   {
+      buffer.flip();
+
+      channel.write(buffer);
+   }
+
+   private ByteBuffer generateBuffer(final int size, final byte ch)
+   {
+      ByteBuffer bb = ByteBuffer.allocateDirect(size);
+
+      for (int i = 0; i < size; i++)
+      {
+         bb.put(ch);
+      }
+
+      return bb;
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBuffer.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBuffer.java
new file mode 100644
index 0000000..1e9b3d6
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBuffer.java
@@ -0,0 +1,560 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQBuffers;
+import org.apache.activemq6.api.core.HornetQInterruptedException;
+import org.apache.activemq6.core.journal.EncodingSupport;
+import org.apache.activemq6.core.journal.IOAsyncTask;
+import org.apache.activemq6.core.journal.impl.dataformat.ByteArrayEncoding;
+import org.apache.activemq6.journal.HornetQJournalLogger;
+
+/**
+ * A TimedBuffer
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public class TimedBuffer
+{
+   // Constants -----------------------------------------------------
+
+   // The number of tries on sleep before switching to spin
+   public static final int MAX_CHECKS_ON_SLEEP = 20;
+
+   // Attributes ----------------------------------------------------
+
+   private TimedBufferObserver bufferObserver;
+
+   // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
+   // in spinning and checking the time - and using up CPU in the process - this semaphore is used to
+   // prevent that
+   private final Semaphore spinLimiter = new Semaphore(1);
+
+   private CheckTimer timerRunnable = new CheckTimer();
+
+   private final int bufferSize;
+
+   private final HornetQBuffer buffer;
+
+   private int bufferLimit = 0;
+
+   private List<IOAsyncTask> callbacks;
+
+   private volatile int timeout;
+
+   // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
+   private volatile boolean pendingSync = false;
+
+   private Thread timerThread;
+
+   private volatile boolean started;
+
+   // We use this flag to prevent flush occurring between calling checkSize and addBytes
+   // CheckSize must always be followed by it's corresponding addBytes otherwise the buffer
+   // can get in an inconsistent state
+   private boolean delayFlush;
+
+   // for logging write rates
+
+   private final boolean logRates;
+
+   private final AtomicLong bytesFlushed = new AtomicLong(0);
+
+   private final AtomicLong flushesDone = new AtomicLong(0);
+
+   private Timer logRatesTimer;
+
+   private TimerTask logRatesTimerTask;
+
+   private boolean useSleep = true;
+
+   // no need to be volatile as every access is synchronized
+   private boolean spinning = false;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public TimedBuffer(final int size, final int timeout, final boolean logRates)
+   {
+      bufferSize = size;
+
+      this.logRates = logRates;
+
+      if (logRates)
+      {
+         logRatesTimer = new Timer(true);
+      }
+      // Setting the interval for nano-sleeps
+
+      buffer = HornetQBuffers.fixedBuffer(bufferSize);
+
+      buffer.clear();
+
+      bufferLimit = 0;
+
+      callbacks = new ArrayList<IOAsyncTask>();
+
+      this.timeout = timeout;
+   }
+
+   // for Debug purposes
+   public synchronized boolean isUseSleep()
+   {
+      return useSleep;
+   }
+
+   public synchronized void setUseSleep(boolean useSleep)
+   {
+      this.useSleep = useSleep;
+   }
+
+   public synchronized void start()
+   {
+      if (started)
+      {
+         return;
+      }
+
+      // Need to start with the spin limiter acquired
+      try
+      {
+         spinLimiter.acquire();
+      }
+      catch (InterruptedException e)
+      {
+         throw new HornetQInterruptedException(e);
+      }
+
+      timerRunnable = new CheckTimer();
+
+      timerThread = new Thread(timerRunnable, "hornetq-buffer-timeout");
+
+      timerThread.start();
+
+      if (logRates)
+      {
+         logRatesTimerTask = new LogRatesTimerTask();
+
+         logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
+      }
+
+      started = true;
+   }
+
+   public void stop()
+   {
+      if (!started)
+      {
+         return;
+      }
+
+      flush();
+
+      bufferObserver = null;
+
+      timerRunnable.close();
+
+      spinLimiter.release();
+
+      if (logRates)
+      {
+         logRatesTimerTask.cancel();
+      }
+
+      while (timerThread.isAlive())
+      {
+         try
+         {
+            timerThread.join();
+         }
+         catch (InterruptedException e)
+         {
+            throw new HornetQInterruptedException(e);
+         }
+      }
+
+      started = false;
+   }
+
+   public synchronized void setObserver(final TimedBufferObserver observer)
+   {
+      if (bufferObserver != null)
+      {
+         flush();
+      }
+
+      bufferObserver = observer;
+   }
+
+   /**
+    * Verify if the size fits the buffer
+    *
+    * @param sizeChecked
+    */
+   public synchronized boolean checkSize(final int sizeChecked)
+   {
+      if (!started)
+      {
+         throw new IllegalStateException("TimedBuffer is not started");
+      }
+
+      if (sizeChecked > bufferSize)
+      {
+         throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
+                                            ") on the journal");
+      }
+
+      if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit)
+      {
+         // Either there is not enough space left in the buffer for the sized record
+         // Or a flush has just been performed and we need to re-calcualate bufferLimit
+
+         flush();
+
+         delayFlush = true;
+
+         final int remainingInFile = bufferObserver.getRemainingBytes();
+
+         if (sizeChecked > remainingInFile)
+         {
+            return false;
+         }
+         else
+         {
+            // There is enough space in the file for this size
+
+            // Need to re-calculate buffer limit
+
+            bufferLimit = Math.min(remainingInFile, bufferSize);
+
+            return true;
+         }
+      }
+      else
+      {
+         delayFlush = true;
+
+         return true;
+      }
+   }
+
+   public synchronized void addBytes(final HornetQBuffer bytes, final boolean sync, final IOAsyncTask callback)
+   {
+      addBytes(new ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback);
+   }
+
+   public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
+   {
+      if (!started)
+      {
+         throw new IllegalStateException("TimedBuffer is not started");
+      }
+
+      delayFlush = false;
+
+      bytes.encode(buffer);
+
+      callbacks.add(callback);
+
+      if (sync)
+      {
+         pendingSync = true;
+
+         startSpin();
+      }
+
+   }
+
+   public void flush()
+   {
+      flush(false);
+   }
+
+   /**
+    * force means the Journal is moving to a new file. Any pending write need to be done immediately
+    * or data could be lost
+    */
+   public void flush(final boolean force)
+   {
+      synchronized (this)
+      {
+         if (!started)
+         {
+            throw new IllegalStateException("TimedBuffer is not started");
+         }
+
+         if ((force || !delayFlush) && buffer.writerIndex() > 0)
+         {
+            int pos = buffer.writerIndex();
+
+            if (logRates)
+            {
+               bytesFlushed.addAndGet(pos);
+            }
+
+            ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
+
+            // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
+            // Using bufferToFlush.put(buffer) would make several append calls for each byte
+            // We also transfer the content of this buffer to the native file's buffer
+
+            bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
+
+            bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
+
+            stopSpin();
+
+            pendingSync = false;
+
+            // swap the instance as the previous callback list is being used asynchronously
+            callbacks = new LinkedList<IOAsyncTask>();
+
+            buffer.clear();
+
+            bufferLimit = 0;
+
+            flushesDone.incrementAndGet();
+         }
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   private class LogRatesTimerTask extends TimerTask
+   {
+      private boolean closed;
+
+      private long lastExecution;
+
+      private long lastBytesFlushed;
+
+      private long lastFlushesDone;
+
+      @Override
+      public synchronized void run()
+      {
+         if (!closed)
+         {
+            long now = System.currentTimeMillis();
+
+            long bytesF = bytesFlushed.get();
+            long flushesD = flushesDone.get();
+
+            if (lastExecution != 0)
+            {
+               double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution);
+               HornetQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024)));
+               double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution);
+               HornetQJournalLogger.LOGGER.flushRate(flushRate);
+            }
+
+            lastExecution = now;
+
+            lastBytesFlushed = bytesF;
+
+            lastFlushesDone = flushesD;
+         }
+      }
+
+      @Override
+      public synchronized boolean cancel()
+      {
+         closed = true;
+
+         return super.cancel();
+      }
+   }
+
+   private class CheckTimer implements Runnable
+   {
+      private volatile boolean closed = false;
+
+      int checks = 0;
+      int failedChecks = 0;
+      long timeBefore = 0;
+
+      final int sleepMillis = timeout / 1000000; // truncates
+      final int sleepNanos = timeout % 1000000;
+
+
+      public void run()
+      {
+         long lastFlushTime = 0;
+
+         while (!closed)
+         {
+            // We flush on the timer if there are pending syncs there and we've waited at least one
+            // timeout since the time of the last flush.
+            // Effectively flushing "resets" the timer
+            // On the timeout verification, notice that we ignore the timeout check if we are using sleep
+
+            if (pendingSync)
+            {
+               if (isUseSleep())
+               {
+                  // if using sleep, we will always flush
+                  flush();
+                  lastFlushTime = System.nanoTime();
+               }
+               else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout)
+               {
+                  // if not using flush we will spin and do the time checks manually
+                  flush();
+                  lastFlushTime = System.nanoTime();
+               }
+
+            }
+
+            sleepIfPossible();
+
+            try
+            {
+               spinLimiter.acquire();
+
+               Thread.yield();
+
+               spinLimiter.release();
+            }
+            catch (InterruptedException e)
+            {
+               throw new HornetQInterruptedException(e);
+            }
+         }
+      }
+
+      /**
+       * We will attempt to use sleep only if the system supports nano-sleep
+       * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well.
+       * if more than 50% of the checks have failed we will cancel the sleep and just use regular spin
+       */
+      private void sleepIfPossible()
+      {
+         if (isUseSleep())
+         {
+            if (checks < MAX_CHECKS_ON_SLEEP)
+            {
+               timeBefore = System.nanoTime();
+            }
+
+            try
+            {
+               sleep(sleepMillis, sleepNanos);
+            }
+            catch (InterruptedException e)
+            {
+               throw new HornetQInterruptedException(e);
+            }
+            catch (Exception e)
+            {
+               setUseSleep(false);
+               HornetQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
+            }
+
+            if (checks < MAX_CHECKS_ON_SLEEP)
+            {
+               long realTimeSleep = System.nanoTime() - timeBefore;
+
+               // I'm letting the real time to be up to 50% than the requested sleep.
+               if (realTimeSleep > timeout * 1.5)
+               {
+                  failedChecks++;
+               }
+
+               if (++checks >= MAX_CHECKS_ON_SLEEP)
+               {
+                  if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5)
+                  {
+                     HornetQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
+                     setUseSleep(false);
+                  }
+               }
+            }
+         }
+      }
+
+      public void close()
+      {
+         closed = true;
+      }
+   }
+
+   /**
+    * Sub classes (tests basically) can use this to override how the sleep is being done
+    *
+    * @param sleepMillis
+    * @param sleepNanos
+    * @throws InterruptedException
+    */
+   protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException
+   {
+      Thread.sleep(sleepMillis, sleepNanos);
+   }
+
+   /**
+    * Sub classes (tests basically) can use this to override disabling spinning
+    */
+   protected void stopSpin()
+   {
+      if (spinning)
+      {
+         try
+         {
+            // We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning
+            // when the buffer is inactive
+            spinLimiter.acquire();
+         }
+         catch (InterruptedException e)
+         {
+            throw new HornetQInterruptedException(e);
+         }
+
+         spinning = false;
+      }
+   }
+
+
+   /**
+    * Sub classes (tests basically) can use this to override disabling spinning
+    */
+   protected void startSpin()
+   {
+      if (!spinning)
+      {
+         spinLimiter.release();
+
+         spinning = true;
+      }
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBufferObserver.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBufferObserver.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBufferObserver.java
new file mode 100644
index 0000000..6560ac7
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TimedBufferObserver.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.activemq6.core.journal.IOAsyncTask;
+
+/**
+ * A TimedBufferObserver
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface TimedBufferObserver
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   void flushBuffer(ByteBuffer buffer, boolean syncRequested, List<IOAsyncTask> callbacks);
+
+   /** Return the number of remaining bytes that still fit on the observer (file) */
+   int getRemainingBytes();
+
+   ByteBuffer newBuffer(int size, int limit);
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TransactionCallback.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TransactionCallback.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TransactionCallback.java
new file mode 100644
index 0000000..ce3c157
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/TransactionCallback.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq6.core.journal.IOAsyncTask;
+import org.apache.activemq6.utils.ReusableLatch;
+
+/**
+ * A TransactionCallback
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class TransactionCallback implements IOAsyncTask
+{
+   private final ReusableLatch countLatch = new ReusableLatch();
+
+   private volatile String errorMessage = null;
+
+   private volatile int errorCode = 0;
+
+   private final AtomicInteger up = new AtomicInteger();
+
+   private volatile int done = 0;
+
+   private volatile IOAsyncTask delegateCompletion;
+
+   public void countUp()
+   {
+      up.incrementAndGet();
+      countLatch.countUp();
+   }
+
+   public void done()
+   {
+      countLatch.countDown();
+      if (++done == up.get() && delegateCompletion != null)
+      {
+         final IOAsyncTask delegateToCall = delegateCompletion;
+         // We need to set the delegateCompletion to null first or blocking commits could miss a callback
+         // What would affect mainly tests
+         delegateCompletion = null;
+         delegateToCall.done();
+      }
+   }
+
+   public void waitCompletion() throws InterruptedException
+   {
+      countLatch.await();
+
+      if (errorMessage != null)
+      {
+         throw new IllegalStateException("Error on Transaction: " + errorCode + " - " + errorMessage);
+      }
+   }
+
+   public void onError(final int errorCode, final String errorMessage)
+   {
+      this.errorMessage = errorMessage;
+
+      this.errorCode = errorCode;
+
+      countLatch.countDown();
+
+      if (delegateCompletion != null)
+      {
+         delegateCompletion.onError(errorCode, errorMessage);
+      }
+   }
+
+   /**
+    * @return the delegateCompletion
+    */
+   public IOAsyncTask getDelegateCompletion()
+   {
+      return delegateCompletion;
+   }
+
+   /**
+    * @param delegateCompletion the delegateCompletion to set
+    */
+   public void setDelegateCompletion(final IOAsyncTask delegateCompletion)
+   {
+      this.delegateCompletion = delegateCompletion;
+   }
+
+   /**
+    * @return the errorMessage
+    */
+   public String getErrorMessage()
+   {
+      return errorMessage;
+   }
+
+   /**
+    * @return the errorCode
+    */
+   public int getErrorCode()
+   {
+      return errorCode;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/ByteArrayEncoding.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/ByteArrayEncoding.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/ByteArrayEncoding.java
new file mode 100644
index 0000000..acb232b
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/ByteArrayEncoding.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl.dataformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.journal.EncodingSupport;
+
+/**
+ * A ByteArrayEncoding
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ByteArrayEncoding implements EncodingSupport
+{
+
+   final byte[] data;
+
+   public ByteArrayEncoding(final byte[] data)
+   {
+      this.data = data;
+   }
+
+   // Public --------------------------------------------------------
+
+   public void decode(final HornetQBuffer buffer)
+   {
+      throw new IllegalStateException("operation not supported");
+   }
+
+   public void encode(final HornetQBuffer buffer)
+   {
+      buffer.writeBytes(data);
+   }
+
+   public int getEncodeSize()
+   {
+      return data.length;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecord.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecord.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecord.java
new file mode 100644
index 0000000..d62eca7
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/dataformat/JournalAddRecord.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl.dataformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.journal.EncodingSupport;
+import org.apache.activemq6.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalAddRecord
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalAddRecord extends JournalInternalRecord
+{
+
+   private final long id;
+
+   private final EncodingSupport record;
+
+   private final byte recordType;
+
+   private final boolean add;
+
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    */
+   public JournalAddRecord(final boolean add, final long id, final byte recordType, final EncodingSupport record)
+   {
+      this.id = id;
+
+      this.record = record;
+
+      this.recordType = recordType;
+
+      this.add = add;
+   }
+
+   @Override
+   public void encode(final HornetQBuffer buffer)
+   {
+      if (add)
+      {
+         buffer.writeByte(JournalImpl.ADD_RECORD);
+      }
+      else
+      {
+         buffer.writeByte(JournalImpl.UPDATE_RECORD);
+      }
+
+      buffer.writeInt(fileID);
+
+      buffer.writeByte(compactCount);
+
+      buffer.writeLong(id);
+
+      buffer.writeInt(record.getEncodeSize());
+
+      buffer.writeByte(recordType);
+
+      record.encode(buffer);
+
+      buffer.writeInt(getEncodeSize());
+   }
+
+   @Override
+   public int getEncodeSize()
+   {
+      return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize() + 1;
+   }
+}
\ No newline at end of file


Mime
View raw message