activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [07/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:41:37 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalImpl.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalImpl.java
new file mode 100644
index 0000000..b54b004
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalImpl.java
@@ -0,0 +1,3252 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQBuffers;
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.core.journal.EncodingSupport;
+import org.apache.activemq6.core.journal.IOAsyncTask;
+import org.apache.activemq6.core.journal.IOCompletion;
+import org.apache.activemq6.core.journal.JournalLoadInformation;
+import org.apache.activemq6.core.journal.LoaderCallback;
+import org.apache.activemq6.core.journal.PreparedTransactionInfo;
+import org.apache.activemq6.core.journal.RecordInfo;
+import org.apache.activemq6.core.journal.SequentialFile;
+import org.apache.activemq6.core.journal.SequentialFileFactory;
+import org.apache.activemq6.core.journal.TestableJournal;
+import org.apache.activemq6.core.journal.TransactionFailureCallback;
+import org.apache.activemq6.core.journal.impl.dataformat.ByteArrayEncoding;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalAddRecord;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalAddRecordTX;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalCompleteRecordTX;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalCompleteRecordTX.TX_RECORD_TYPE;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalDeleteRecord;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalDeleteRecordTX;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalInternalRecord;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalRollbackRecordTX;
+import org.apache.activemq6.journal.HornetQJournalBundle;
+import org.apache.activemq6.journal.HornetQJournalLogger;
+import org.apache.activemq6.utils.ConcurrentHashSet;
+import org.apache.activemq6.utils.DataConstants;
+
+/**
+ * <p>A circular log implementation.</p
+ * <p/>
+ * <p>Look at {@link JournalImpl#load(LoaderCallback)} for the file layout
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ */
+public class JournalImpl extends JournalBase implements TestableJournal, JournalRecordProvider
+{
+
+   // Constants -----------------------------------------------------
+
+   public static final int FORMAT_VERSION = 2;
+
+   private static final int[] COMPATIBLE_VERSIONS = new int[]{1};
+
+   // Static --------------------------------------------------------
+   private static final boolean trace = HornetQJournalLogger.LOGGER.isTraceEnabled();
+
+   // This is useful at debug time...
+   // if you set it to true, all the appends, deletes, rollbacks, commits, etc.. are sent to System.out
+   private static final boolean TRACE_RECORDS = trace;
+
+   // This method exists just to make debug easier.
+   // I could replace log.trace by log.info temporarily while I was debugging
+   // Journal
+   private static void trace(final String message)
+   {
+      HornetQJournalLogger.LOGGER.trace(message);
+   }
+
+   private static void traceRecord(final String message)
+   {
+      HornetQJournalLogger.LOGGER.trace(message);
+   }
+
+   // The sizes of primitive types
+
+   public static final int MIN_FILE_SIZE = 1024;
+
+   // FileID(Long) + JournalVersion + UserVersion
+   public static final int SIZE_HEADER = DataConstants.SIZE_LONG + DataConstants.SIZE_INT + DataConstants.SIZE_INT;
+
+   private static final int BASIC_SIZE = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_INT;
+
+   public static final int SIZE_ADD_RECORD = JournalImpl.BASIC_SIZE + DataConstants.SIZE_LONG +
+      DataConstants.SIZE_BYTE +
+      DataConstants.SIZE_INT /* + record.length */;
+
+   // Record markers - they must be all unique
+
+   public static final byte ADD_RECORD = 11;
+
+   public static final byte UPDATE_RECORD = 12;
+
+   public static final int SIZE_ADD_RECORD_TX = JournalImpl.BASIC_SIZE + DataConstants.SIZE_LONG +
+      DataConstants.SIZE_BYTE +
+      DataConstants.SIZE_LONG +
+      DataConstants.SIZE_INT /* + record.length */;
+
+   public static final byte ADD_RECORD_TX = 13;
+
+   public static final byte UPDATE_RECORD_TX = 14;
+
+   public static final int SIZE_DELETE_RECORD_TX = JournalImpl.BASIC_SIZE + DataConstants.SIZE_LONG +
+      DataConstants.SIZE_LONG +
+      DataConstants.SIZE_INT /* + record.length */;
+
+   public static final byte DELETE_RECORD_TX = 15;
+
+   public static final int SIZE_DELETE_RECORD = JournalImpl.BASIC_SIZE + DataConstants.SIZE_LONG;
+
+   public static final byte DELETE_RECORD = 16;
+
+   public static final int SIZE_COMPLETE_TRANSACTION_RECORD = JournalImpl.BASIC_SIZE + DataConstants.SIZE_LONG +
+      DataConstants.SIZE_INT;
+
+   public static final int SIZE_PREPARE_RECORD = JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + DataConstants.SIZE_INT;
+
+   public static final byte PREPARE_RECORD = 17;
+
+   public static final int SIZE_COMMIT_RECORD = JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD;
+
+   public static final byte COMMIT_RECORD = 18;
+
+   public static final int SIZE_ROLLBACK_RECORD = JournalImpl.BASIC_SIZE + DataConstants.SIZE_LONG;
+
+   public static final byte ROLLBACK_RECORD = 19;
+
+   protected static final byte FILL_CHARACTER = (byte) 'J';
+
+   // Attributes ----------------------------------------------------
+
+   private volatile boolean autoReclaim = true;
+
+   private final int userVersion;
+
+   private final int minFiles;
+
+   private final float compactPercentage;
+
+   private final int compactMinFiles;
+
+   private final SequentialFileFactory fileFactory;
+
+   private final JournalFilesRepository filesRepository;
+
+   // Compacting may replace this structure
+   private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
+
+   // Compacting may replace this structure
+   private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<Long, JournalTransaction>();
+
+   // This will be set only while the JournalCompactor is being executed
+   private volatile JournalCompactor compactor;
+
+   private final AtomicBoolean compactorRunning = new AtomicBoolean();
+
+   private ExecutorService filesExecutor = null;
+
+   private ExecutorService compactorExecutor = null;
+
+   private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<CountDownLatch>();
+
+   // Lock used during the append of records
+   // This lock doesn't represent a global lock.
+   // After a record is appended, the usedFile can't be changed until the positives and negatives are updated
+   private final ReentrantLock lockAppend = new ReentrantLock();
+
+   /**
+    * We don't lock the journal during the whole compacting operation. During compacting we only
+    * lock it (i) when gathering the initial structure, and (ii) when replicating the structures
+    * after finished compacting.
+    * <p/>
+    * However we need to lock it while taking and updating snapshots
+    */
+   private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
+   private final ReadWriteLock compactorLock = new ReentrantReadWriteLock();
+
+   private volatile JournalFile currentFile;
+
+   private volatile JournalState state = JournalState.STOPPED;
+
+   private final Reclaimer reclaimer = new Reclaimer();
+
+   // Constructors --------------------------------------------------
+
+   public JournalImpl(final int fileSize,
+                      final int minFiles,
+                      final int compactMinFiles,
+                      final int compactPercentage,
+                      final SequentialFileFactory fileFactory,
+                      final String filePrefix,
+                      final String fileExtension,
+                      final int maxAIO)
+   {
+      this(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0);
+   }
+
+
+   public JournalImpl(final int fileSize,
+                      final int minFiles,
+                      final int compactMinFiles,
+                      final int compactPercentage,
+                      final SequentialFileFactory fileFactory,
+                      final String filePrefix,
+                      final String fileExtension,
+                      final int maxAIO, final int userVersion)
+   {
+      super(fileFactory.isSupportsCallbacks(), fileSize);
+      if (fileSize % fileFactory.getAlignment() != 0)
+      {
+         throw new IllegalArgumentException("Invalid journal-file-size " + fileSize + ", It should be multiple of " +
+                                               fileFactory.getAlignment());
+      }
+      if (minFiles < 2)
+      {
+         throw new IllegalArgumentException("minFiles cannot be less than 2");
+      }
+      if (compactPercentage < 0 || compactPercentage > 100)
+      {
+         throw new IllegalArgumentException("Compact Percentage out of range");
+      }
+
+      if (compactPercentage == 0)
+      {
+         this.compactPercentage = 0;
+      }
+      else
+      {
+         this.compactPercentage = compactPercentage / 100f;
+      }
+
+      this.compactMinFiles = compactMinFiles;
+      this.minFiles = minFiles;
+
+      this.fileFactory = fileFactory;
+
+      filesRepository = new JournalFilesRepository(fileFactory,
+                                                   this,
+                                                   filePrefix,
+                                                   fileExtension,
+                                                   userVersion,
+                                                   maxAIO,
+                                                   fileSize,
+                                                   minFiles);
+
+      this.userVersion = userVersion;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "JournalImpl(state=" + state + ", currentFile=[" + currentFile + "], hash=" + super.toString() + ")";
+   }
+
+   public void runDirectJournalBlast() throws Exception
+   {
+      final int numIts = 100000000;
+
+      HornetQJournalLogger.LOGGER.runningJournalBlast(numIts);
+
+      final CountDownLatch latch = new CountDownLatch(numIts * 2);
+
+      class MyIOAsyncTask implements IOCompletion
+      {
+         public void done()
+         {
+            latch.countDown();
+         }
+
+         public void onError(final int errorCode, final String errorMessage)
+         {
+
+         }
+
+         public void storeLineUp()
+         {
+         }
+      }
+
+      final MyIOAsyncTask task = new MyIOAsyncTask();
+
+      final int recordSize = 1024;
+
+      final byte[] bytes = new byte[recordSize];
+
+      class MyRecord implements EncodingSupport
+      {
+
+         public void decode(final HornetQBuffer buffer)
+         {
+         }
+
+         public void encode(final HornetQBuffer buffer)
+         {
+            buffer.writeBytes(bytes);
+         }
+
+         public int getEncodeSize()
+         {
+            return recordSize;
+         }
+
+      }
+
+      MyRecord record = new MyRecord();
+
+      for (int i = 0; i < numIts; i++)
+      {
+         appendAddRecord(i, (byte) 1, record, true, task);
+         appendDeleteRecord(i, true, task);
+      }
+
+      latch.await();
+   }
+
+   public Map<Long, JournalRecord> getRecords()
+   {
+      return records;
+   }
+
+   public JournalFile getCurrentFile()
+   {
+      return currentFile;
+   }
+
+   public JournalCompactor getCompactor()
+   {
+      return compactor;
+   }
+
+   /**
+    * this method is used internally only however tools may use it to maintenance.
+    * It won't be part of the interface as the tools should be specific to the implementation
+    */
+   public List<JournalFile> orderFiles() throws Exception
+   {
+      List<String> fileNames = fileFactory.listFiles(filesRepository.getFileExtension());
+
+      List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
+
+      for (String fileName : fileNames)
+      {
+         SequentialFile file = fileFactory.createSequentialFile(fileName, filesRepository.getMaxAIO());
+
+         file.open(1, false);
+
+         try
+         {
+
+            JournalFileImpl jrnFile = readFileHeader(file);
+
+            orderedFiles.add(jrnFile);
+         }
+         finally
+         {
+            file.close();
+         }
+      }
+
+      // Now order them by ordering id - we can't use the file name for ordering
+      // since we can re-use dataFiles
+
+      Collections.sort(orderedFiles, new JournalFileComparator());
+
+      return orderedFiles;
+   }
+
+   /**
+    * this method is used internally only however tools may use it to maintenance.
+    */
+   public static int readJournalFile(final SequentialFileFactory fileFactory,
+                                     final JournalFile file,
+                                     final JournalReaderCallback reader) throws Exception
+   {
+      file.getFile().open(1, false);
+      ByteBuffer wholeFileBuffer = null;
+      try
+      {
+         final int filesize = (int) file.getFile().size();
+
+         wholeFileBuffer = fileFactory.newBuffer(filesize);
+
+         final int journalFileSize = file.getFile().read(wholeFileBuffer);
+
+         if (journalFileSize != filesize)
+         {
+            throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
+         }
+
+         // First long is the ordering timestamp, we just jump its position
+         wholeFileBuffer.position(JournalImpl.SIZE_HEADER);
+
+         int lastDataPos = JournalImpl.SIZE_HEADER;
+
+         while (wholeFileBuffer.hasRemaining())
+         {
+            final int pos = wholeFileBuffer.position();
+
+            byte recordType = wholeFileBuffer.get();
+
+            if (recordType < JournalImpl.ADD_RECORD || recordType > JournalImpl.ROLLBACK_RECORD)
+            {
+               // I - We scan for any valid record on the file. If a hole
+               // happened on the middle of the file we keep looking until all
+               // the possibilities are gone
+               continue;
+            }
+
+            if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
+            {
+               reader.markAsDataFile(file);
+
+               wholeFileBuffer.position(pos + 1);
+               // II - Ignore this record, let's keep looking
+               continue;
+            }
+
+            // III - Every record has the file-id.
+            // This is what supports us from not re-filling the whole file
+            int readFileId = wholeFileBuffer.getInt();
+
+            // This record is from a previous file-usage. The file was
+            // reused and we need to ignore this record
+            if (readFileId != file.getRecordID())
+            {
+               wholeFileBuffer.position(pos + 1);
+               continue;
+            }
+
+            short compactCount = 0;
+
+            if (file.getJournalVersion() >= 2)
+            {
+               if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_BYTE))
+               {
+                  reader.markAsDataFile(file);
+
+                  wholeFileBuffer.position(pos + 1);
+                  continue;
+               }
+
+               compactCount = wholeFileBuffer.get();
+            }
+
+            long transactionID = 0;
+
+            if (JournalImpl.isTransaction(recordType))
+            {
+               if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
+               {
+                  wholeFileBuffer.position(pos + 1);
+                  reader.markAsDataFile(file);
+                  continue;
+               }
+
+               transactionID = wholeFileBuffer.getLong();
+            }
+
+            long recordID = 0;
+
+            // If prepare or commit
+            if (!JournalImpl.isCompleteTransaction(recordType))
+            {
+               if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
+               {
+                  wholeFileBuffer.position(pos + 1);
+                  reader.markAsDataFile(file);
+                  continue;
+               }
+
+               recordID = wholeFileBuffer.getLong();
+            }
+
+            // We use the size of the record to validate the health of the
+            // record.
+            // (V) We verify the size of the record
+
+            // The variable record portion used on Updates and Appends
+            int variableSize = 0;
+
+            // Used to hold extra data on transaction prepares
+            int preparedTransactionExtraDataSize = 0;
+
+            byte userRecordType = 0;
+
+            byte[] record = null;
+
+            if (JournalImpl.isContainsBody(recordType))
+            {
+               if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
+               {
+                  wholeFileBuffer.position(pos + 1);
+                  reader.markAsDataFile(file);
+                  continue;
+               }
+
+               variableSize = wholeFileBuffer.getInt();
+
+               if (recordType != JournalImpl.DELETE_RECORD_TX)
+               {
+                  if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), 1))
+                  {
+                     wholeFileBuffer.position(pos + 1);
+                     continue;
+                  }
+
+                  userRecordType = wholeFileBuffer.get();
+               }
+
+               if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), variableSize))
+               {
+                  wholeFileBuffer.position(pos + 1);
+                  continue;
+               }
+
+               record = new byte[variableSize];
+
+               wholeFileBuffer.get(record);
+            }
+
+            // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
+            // currentFile
+            int transactionCheckNumberOfRecords = 0;
+
+            if (recordType == JournalImpl.PREPARE_RECORD || recordType == JournalImpl.COMMIT_RECORD)
+            {
+               if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
+               {
+                  wholeFileBuffer.position(pos + 1);
+                  continue;
+               }
+
+               transactionCheckNumberOfRecords = wholeFileBuffer.getInt();
+
+               if (recordType == JournalImpl.PREPARE_RECORD)
+               {
+                  if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
+                  {
+                     wholeFileBuffer.position(pos + 1);
+                     continue;
+                  }
+                  // Add the variable size required for preparedTransactions
+                  preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
+               }
+               variableSize = 0;
+            }
+
+            int recordSize = JournalImpl.getRecordSize(recordType, file.getJournalVersion());
+
+            // VI - this is completing V, We will validate the size at the end
+            // of the record,
+            // But we avoid buffer overflows by damaged data
+            if (JournalImpl.isInvalidSize(journalFileSize, pos, recordSize + variableSize +
+               preparedTransactionExtraDataSize))
+            {
+               // Avoid a buffer overflow caused by damaged data... continue
+               // scanning for more pendingTransactions...
+               JournalImpl.trace("Record at position " + pos +
+                                    " recordType = " +
+                                    recordType +
+                                    " file:" +
+                                    file.getFile().getFileName() +
+                                    " recordSize: " +
+                                    recordSize +
+                                    " variableSize: " +
+                                    variableSize +
+                                    " preparedTransactionExtraDataSize: " +
+                                    preparedTransactionExtraDataSize +
+                                    " is corrupted and it is being ignored (II)");
+               // If a file has damaged pendingTransactions, we make it a dataFile, and the
+               // next reclaiming will fix it
+               reader.markAsDataFile(file);
+               wholeFileBuffer.position(pos + 1);
+
+               continue;
+            }
+
+            int oldPos = wholeFileBuffer.position();
+
+            wholeFileBuffer.position(pos + variableSize +
+                                        recordSize +
+                                        preparedTransactionExtraDataSize -
+                                        DataConstants.SIZE_INT);
+
+            int checkSize = wholeFileBuffer.getInt();
+
+            // VII - The checkSize at the end has to match with the size
+            // informed at the beginning.
+            // This is like testing a hash for the record. (We could replace the
+            // checkSize by some sort of calculated hash)
+            if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+            {
+               JournalImpl.trace("Record at position " + pos +
+                                    " recordType = " +
+                                    recordType +
+                                    " possible transactionID = " +
+                                    transactionID +
+                                    " possible recordID = " +
+                                    recordID +
+                                    " file:" +
+                                    file.getFile().getFileName() +
+                                    " is corrupted and it is being ignored (III)");
+
+               // If a file has damaged pendingTransactions, we make it a dataFile, and the
+               // next reclaiming will fix it
+               reader.markAsDataFile(file);
+
+               wholeFileBuffer.position(pos + DataConstants.SIZE_BYTE);
+
+               continue;
+            }
+
+            wholeFileBuffer.position(oldPos);
+
+            // At this point everything is checked. So we relax and just load
+            // the data now.
+
+            switch (recordType)
+            {
+               case ADD_RECORD:
+               {
+                  reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount));
+                  break;
+               }
+
+               case UPDATE_RECORD:
+               {
+                  reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true, compactCount));
+                  break;
+               }
+
+               case DELETE_RECORD:
+               {
+                  reader.onReadDeleteRecord(recordID);
+                  break;
+               }
+
+               case ADD_RECORD_TX:
+               {
+                  reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID,
+                                                                         userRecordType,
+                                                                         record,
+                                                                         false,
+                                                                         compactCount));
+                  break;
+               }
+
+               case UPDATE_RECORD_TX:
+               {
+                  reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID,
+                                                                            userRecordType,
+                                                                            record,
+                                                                            true,
+                                                                            compactCount));
+                  break;
+               }
+
+               case DELETE_RECORD_TX:
+               {
+                  reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID,
+                                                                            (byte) 0,
+                                                                            record,
+                                                                            true,
+                                                                            compactCount));
+                  break;
+               }
+
+               case PREPARE_RECORD:
+               {
+
+                  byte[] extraData = new byte[preparedTransactionExtraDataSize];
+
+                  wholeFileBuffer.get(extraData);
+
+                  reader.onReadPrepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);
+
+                  break;
+               }
+               case COMMIT_RECORD:
+               {
+
+                  reader.onReadCommitRecord(transactionID, transactionCheckNumberOfRecords);
+                  break;
+               }
+               case ROLLBACK_RECORD:
+               {
+                  reader.onReadRollbackRecord(transactionID);
+                  break;
+               }
+               default:
+               {
+                  throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+                                                     " is corrupt, invalid record type " +
+                                                     recordType);
+               }
+            }
+
+            checkSize = wholeFileBuffer.getInt();
+
+            // This is a sanity check about the loading code itself.
+            // If this checkSize doesn't match, it means the reading method is
+            // not doing what it was supposed to do
+            if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+            {
+               throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
+                                                  ", pos = " +
+                                                  pos);
+            }
+
+            lastDataPos = wholeFileBuffer.position();
+
+         }
+
+         return lastDataPos;
+      }
+      catch (Throwable e)
+      {
+         HornetQJournalLogger.LOGGER.errorReadingFile(e);
+         throw new Exception(e.getMessage(), e);
+      }
+      finally
+      {
+         if (wholeFileBuffer != null)
+         {
+            fileFactory.releaseBuffer(wholeFileBuffer);
+         }
+
+         try
+         {
+            file.getFile().close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+   // Journal implementation
+   // ----------------------------------------------------------------
+
+   @Override
+   public void appendAddRecord(final long id,
+                               final byte recordType,
+                               final EncodingSupport record,
+                               final boolean sync,
+                               final IOCompletion callback) throws Exception
+   {
+      checkJournalIsLoaded();
+
+      journalLock.readLock().lock();
+
+      try
+      {
+         JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
+
+         if (callback != null)
+         {
+            callback.storeLineUp();
+         }
+
+         lockAppend.lock();
+         try
+         {
+            JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
+
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendAddRecord::id=" + id +
+                                          ", userRecordType=" +
+                                          recordType +
+                                          ", usedFile = " +
+                                          usedFile);
+            }
+
+            records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
+         }
+         finally
+         {
+            lockAppend.unlock();
+         }
+      }
+      finally
+      {
+         journalLock.readLock().unlock();
+      }
+   }
+
+   @Override
+   public void appendUpdateRecord(final long id,
+                                  final byte recordType,
+                                  final EncodingSupport record,
+                                  final boolean sync,
+                                  final IOCompletion callback) throws Exception
+   {
+      checkJournalIsLoaded();
+
+      journalLock.readLock().lock();
+
+      try
+      {
+         JournalRecord jrnRecord = records.get(id);
+
+         if (jrnRecord == null)
+         {
+            if (!(compactor != null && compactor.lookupRecord(id)))
+            {
+               throw new IllegalStateException("Cannot find add info " + id);
+            }
+         }
+
+         JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
+
+         if (callback != null)
+         {
+            callback.storeLineUp();
+         }
+
+         lockAppend.lock();
+         try
+         {
+            JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
+
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendUpdateRecord::id=" + id +
+                                          ", userRecordType=" +
+                                          recordType +
+                                          ", usedFile = " +
+                                          usedFile);
+            }
+
+            // record== null here could only mean there is a compactor, and computing the delete should be done after
+            // compacting is done
+            if (jrnRecord == null)
+            {
+               compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
+            }
+            else
+            {
+               jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
+            }
+         }
+         finally
+         {
+            lockAppend.unlock();
+         }
+      }
+      finally
+      {
+         journalLock.readLock().unlock();
+      }
+   }
+
+
+   @Override
+   public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception
+   {
+      checkJournalIsLoaded();
+
+      journalLock.readLock().lock();
+      try
+      {
+
+         JournalRecord record = null;
+
+         if (compactor == null)
+         {
+            record = records.remove(id);
+
+            if (record == null)
+            {
+               throw new IllegalStateException("Cannot find add info " + id);
+            }
+         }
+         else
+         {
+            if (!records.containsKey(id) && !compactor.lookupRecord(id))
+            {
+               throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
+            }
+         }
+
+         JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
+
+         if (callback != null)
+         {
+            callback.storeLineUp();
+         }
+
+         lockAppend.lock();
+         try
+         {
+            JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
+
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
+            }
+
+            // record== null here could only mean there is a compactor, and computing the delete should be done after
+            // compacting is done
+            if (record == null)
+            {
+               compactor.addCommandDelete(id, usedFile);
+            }
+            else
+            {
+               record.delete(usedFile);
+            }
+
+         }
+         finally
+         {
+            lockAppend.unlock();
+         }
+      }
+      finally
+      {
+         journalLock.readLock().unlock();
+      }
+   }
+
+   @Override
+   public void appendAddRecordTransactional(final long txID,
+                                            final long id,
+                                            final byte recordType,
+                                            final EncodingSupport record) throws Exception
+   {
+      checkJournalIsLoaded();
+
+      journalLock.readLock().lock();
+
+      try
+      {
+         JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+
+         JournalTransaction tx = getTransactionInfo(txID);
+
+         lockAppend.lock();
+         try
+         {
+            JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
+
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendAddRecordTransactional:txID=" + txID +
+                                          ",id=" +
+                                          id +
+                                          ", userRecordType=" +
+                                          recordType +
+                                          ", usedFile = " +
+                                          usedFile);
+            }
+
+            tx.addPositive(usedFile, id, addRecord.getEncodeSize());
+         }
+         finally
+         {
+            lockAppend.unlock();
+         }
+      }
+      finally
+      {
+         journalLock.readLock().unlock();
+      }
+   }
+
+   private void checkJournalIsLoaded()
+   {
+      if (state != JournalState.LOADED && state != JournalState.SYNCING)
+      {
+         throw new IllegalStateException("Journal must be in state=" + JournalState.LOADED + ", was [" + state + "]");
+      }
+   }
+
+   private void setJournalState(JournalState newState)
+   {
+      state = newState;
+   }
+
+   @Override
+   public void appendUpdateRecordTransactional(final long txID,
+                                               final long id,
+                                               final byte recordType,
+                                               final EncodingSupport record) throws Exception
+   {
+      checkJournalIsLoaded();
+
+      journalLock.readLock().lock();
+
+      try
+      {
+         JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
+
+         JournalTransaction tx = getTransactionInfo(txID);
+
+         lockAppend.lock();
+         try
+         {
+            JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
+
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendUpdateRecordTransactional::txID=" + txID +
+                                          ",id=" +
+                                          id +
+                                          ", userRecordType=" +
+                                          recordType +
+                                          ", usedFile = " +
+                                          usedFile);
+            }
+
+            tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
+         }
+         finally
+         {
+            lockAppend.unlock();
+         }
+      }
+      finally
+      {
+         journalLock.readLock().unlock();
+      }
+   }
+
+
+   @Override
+   public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception
+   {
+      checkJournalIsLoaded();
+
+      journalLock.readLock().lock();
+
+      try
+      {
+         JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
+
+         JournalTransaction tx = getTransactionInfo(txID);
+
+         lockAppend.lock();
+         try
+         {
+            JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
+
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendDeleteRecordTransactional::txID=" + txID +
+                                          ", id=" +
+                                          id +
+                                          ", usedFile = " +
+                                          usedFile);
+            }
+
+            tx.addNegative(usedFile, id);
+         }
+         finally
+         {
+            lockAppend.unlock();
+         }
+      }
+      finally
+      {
+         journalLock.readLock().unlock();
+      }
+   }
+
+   /**
+    * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
+    * back to a state it could be committed. </p>
+    * <p/>
+    * <p> transactionData allows you to store any other supporting user-data related to the transaction</p>
+    * <p/>
+    * <p> This method also uses the same logic applied on {@link JournalImpl#appendCommitRecord(long, boolean)}
+    *
+    * @param txID
+    * @param transactionData extra user data for the prepare
+    * @throws Exception
+    */
+   @Override
+   public void appendPrepareRecord(final long txID,
+                                   final EncodingSupport transactionData,
+                                   final boolean sync,
+                                   final IOCompletion callback) throws Exception
+   {
+
+      checkJournalIsLoaded();
+
+      journalLock.readLock().lock();
+
+      try
+      {
+         JournalTransaction tx = getTransactionInfo(txID);
+
+         JournalInternalRecord prepareRecord =
+            new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
+
+         if (callback != null)
+         {
+            callback.storeLineUp();
+         }
+
+         lockAppend.lock();
+         try
+         {
+            JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
+
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
+            }
+
+            tx.prepare(usedFile);
+         }
+         finally
+         {
+            lockAppend.unlock();
+         }
+
+      }
+      finally
+      {
+         journalLock.readLock().unlock();
+      }
+   }
+
+   @Override
+   public void lineUpContext(IOCompletion callback)
+   {
+      callback.storeLineUp();
+   }
+
+
+   /**
+    * Regarding the number of operations in a given file see {@link JournalCompleteRecordTX}.
+    */
+   @Override
+   public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback, boolean lineUpContext) throws Exception
+   {
+      checkJournalIsLoaded();
+
+      journalLock.readLock().lock();
+
+      try
+      {
+         JournalTransaction tx = transactions.remove(txID);
+
+         if (tx == null)
+         {
+            throw new IllegalStateException("Cannot find tx with id " + txID);
+         }
+
+         JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
+
+         if (callback != null && lineUpContext)
+         {
+            callback.storeLineUp();
+         }
+
+         lockAppend.lock();
+         try
+         {
+            JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
+
+            if (JournalImpl.TRACE_RECORDS)
+            {
+               JournalImpl.traceRecord("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
+            }
+
+            tx.commit(usedFile);
+         }
+         finally
+         {
+            lockAppend.unlock();
+         }
+
+      }
+      finally
+      {
+         journalLock.readLock().unlock();
+      }
+   }
+
+   @Override
+   public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
+   {
+      checkJournalIsLoaded();
+
+      journalLock.readLock().lock();
+
+      JournalTransaction tx = null;
+
+      try
+      {
+         tx = transactions.remove(txID);
+
+         if (tx == null)
+         {
+            throw new IllegalStateException("Cannot find tx with id " + txID);
+         }
+
+         JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
+
+         if (callback != null)
+         {
+            callback.storeLineUp();
+         }
+
+         lockAppend.lock();
+         try
+         {
+            JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
+
+            tx.rollback(usedFile);
+         }
+         finally
+         {
+            lockAppend.unlock();
+         }
+
+      }
+      finally
+      {
+         journalLock.readLock().unlock();
+      }
+   }
+
+   // XXX make it protected?
+   public int getAlignment() throws Exception
+   {
+      return fileFactory.getAlignment();
+   }
+
+   private static final class DummyLoader implements LoaderCallback
+   {
+      static final LoaderCallback INSTANCE = new DummyLoader();
+
+      public void failedTransaction(final long transactionID, final List<RecordInfo> records,
+                                    final List<RecordInfo> recordsToDelete)
+      {
+      }
+
+      public void updateRecord(final RecordInfo info)
+      {
+      }
+
+      public void deleteRecord(final long id)
+      {
+      }
+
+      public void addRecord(final RecordInfo info)
+      {
+      }
+
+      public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
+      {
+      }
+   }
+
+   public synchronized JournalLoadInformation loadInternalOnly() throws Exception
+   {
+      return load(DummyLoader.INSTANCE, true, null);
+   }
+
+   public synchronized JournalLoadInformation loadSyncOnly(JournalState syncState) throws Exception
+   {
+      assert syncState == JournalState.SYNCING || syncState == JournalState.SYNCING_UP_TO_DATE;
+      return load(DummyLoader.INSTANCE, true, syncState);
+   }
+
+   public JournalLoadInformation load(final List<RecordInfo> committedRecords,
+                                      final List<PreparedTransactionInfo> preparedTransactions,
+                                      final TransactionFailureCallback failureCallback) throws Exception
+   {
+      return load(committedRecords, preparedTransactions, failureCallback, true);
+   }
+
+   /**
+    * @see JournalImpl#load(LoaderCallback)
+    */
+   public synchronized JournalLoadInformation load(final List<RecordInfo> committedRecords,
+                                                   final List<PreparedTransactionInfo> preparedTransactions,
+                                                   final TransactionFailureCallback failureCallback,
+                                                   final boolean fixBadTX) throws Exception
+   {
+      final Set<Long> recordsToDelete = new HashSet<Long>();
+      // ArrayList was taking too long to delete elements on checkDeleteSize
+      final List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+      final int DELETE_FLUSH = 20000;
+
+      JournalLoadInformation info = load(new LoaderCallback()
+      {
+         Runtime runtime = Runtime.getRuntime();
+
+         private void checkDeleteSize()
+         {
+            // HORNETQ-482 - Flush deletes only if memory is critical
+            if (recordsToDelete.size() > DELETE_FLUSH && runtime.freeMemory() < runtime.maxMemory() * 0.2)
+            {
+               HornetQJournalLogger.LOGGER.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
+               // Clean up when the list is too large, or it won't be possible to load large sets of files
+               // Done as part of JBMESSAGING-1678
+               Iterator<RecordInfo> iter = records.iterator();
+               while (iter.hasNext())
+               {
+                  RecordInfo record = iter.next();
+
+                  if (recordsToDelete.contains(record.id))
+                  {
+                     iter.remove();
+                  }
+               }
+
+               recordsToDelete.clear();
+
+               HornetQJournalLogger.LOGGER.debug("flush delete done");
+            }
+         }
+
+         public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
+         {
+            preparedTransactions.add(preparedTransaction);
+            checkDeleteSize();
+         }
+
+         public void addRecord(final RecordInfo info)
+         {
+            records.add(info);
+            checkDeleteSize();
+         }
+
+         public void updateRecord(final RecordInfo info)
+         {
+            records.add(info);
+            checkDeleteSize();
+         }
+
+         public void deleteRecord(final long id)
+         {
+            recordsToDelete.add(id);
+            checkDeleteSize();
+         }
+
+         public void failedTransaction(final long transactionID,
+                                       final List<RecordInfo> records,
+                                       final List<RecordInfo> recordsToDelete)
+         {
+            if (failureCallback != null)
+            {
+               failureCallback.failedTransaction(transactionID, records, recordsToDelete);
+            }
+         }
+      }, fixBadTX, null);
+
+      for (RecordInfo record : records)
+      {
+         if (!recordsToDelete.contains(record.id))
+         {
+            committedRecords.add(record);
+         }
+      }
+
+      return info;
+   }
+
+
+   public void scheduleCompactAndBlock(int timeout) throws Exception
+   {
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      final CountDownLatch latch = newLatch(1);
+
+      compactorRunning.set(true);
+
+      // We can't use the executor for the compacting... or we would dead lock because of file open and creation
+      // operations (that will use the executor)
+      compactorExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
+
+            try
+            {
+               JournalImpl.this.compact();
+            }
+            catch (Throwable e)
+            {
+               errors.incrementAndGet();
+               HornetQJournalLogger.LOGGER.errorCompacting(e);
+               e.printStackTrace();
+            }
+            finally
+            {
+               latch.countDown();
+            }
+         }
+      });
+
+      try
+      {
+
+         awaitLatch(latch, timeout);
+
+         if (errors.get() > 0)
+         {
+            throw new RuntimeException("Error during compact, look at the logs");
+         }
+      }
+      finally
+      {
+         compactorRunning.set(false);
+      }
+   }
+
+   /**
+    * Note: This method can't be called from the main executor, as it will invoke other methods
+    * depending on it.
+    * <p/>
+    * Note: only synchronized methods on journal are methods responsible for the life-cycle such as
+    * stop, start records will still come as this is being executed
+    */
+   protected synchronized void compact() throws Exception
+   {
+      if (compactor != null)
+      {
+         throw new IllegalStateException("There is pending compacting operation");
+      }
+
+      compactorLock.writeLock().lock();
+      try
+      {
+         ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(filesRepository.getDataFilesCount());
+
+         boolean previousReclaimValue = isAutoReclaim();
+
+         try
+         {
+            HornetQJournalLogger.LOGGER.debug("Starting compacting operation on journal");
+
+            onCompactStart();
+
+            // We need to guarantee that the journal is frozen for this short time
+            // We don't freeze the journal as we compact, only for the short time where we replace records
+            journalLock.writeLock().lock();
+            try
+            {
+               if (state != JournalState.LOADED)
+               {
+                  return;
+               }
+
+               onCompactLockingTheJournal();
+
+               setAutoReclaim(false);
+
+               // We need to move to the next file, as we need a clear start for negatives and positives counts
+               moveNextFile(false);
+
+               // Take the snapshots and replace the structures
+
+               dataFilesToProcess.addAll(filesRepository.getDataFiles());
+
+               filesRepository.clearDataFiles();
+
+               if (dataFilesToProcess.size() == 0)
+               {
+                  trace("Finishing compacting, nothing to process");
+                  return;
+               }
+
+               compactor = new JournalCompactor(fileFactory,
+                                                this,
+                                                filesRepository,
+                                                records.keySet(),
+                                                dataFilesToProcess.get(0).getFileID());
+
+               for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
+               {
+                  compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray());
+                  entry.getValue().setCompacting();
+               }
+
+               // We will calculate the new records during compacting, what will take the position the records will take
+               // after compacting
+               records.clear();
+            }
+            finally
+            {
+               journalLock.writeLock().unlock();
+            }
+
+            Collections.sort(dataFilesToProcess, new JournalFileComparator());
+
+            // This is where most of the work is done, taking most of the time of the compacting routine.
+            // Notice there are no locks while this is being done.
+
+            // Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
+            // well
+            for (final JournalFile file : dataFilesToProcess)
+            {
+               try
+               {
+                  JournalImpl.readJournalFile(fileFactory, file, compactor);
+               }
+               catch (Throwable e)
+               {
+                  HornetQJournalLogger.LOGGER.compactReadError(file);
+                  throw new Exception("Error on reading compacting for " + file, e);
+               }
+            }
+
+            compactor.flush();
+
+            // pointcut for tests
+            // We need to test concurrent updates on the journal, as the compacting is being performed.
+            // Usually tests will use this to hold the compacting while other structures are being updated.
+            onCompactDone();
+
+            List<JournalFile> newDatafiles = null;
+
+            JournalCompactor localCompactor = compactor;
+
+            SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.getNewDataFiles(), null);
+
+            journalLock.writeLock().lock();
+            try
+            {
+               // Need to clear the compactor here, or the replay commands will send commands back (infinite loop)
+               compactor = null;
+
+               onCompactLockingTheJournal();
+
+               newDatafiles = localCompactor.getNewDataFiles();
+
+               // Restore newRecords created during compacting
+               for (Map.Entry<Long, JournalRecord> newRecordEntry : localCompactor.getNewRecords().entrySet())
+               {
+                  records.put(newRecordEntry.getKey(), newRecordEntry.getValue());
+               }
+
+               // Restore compacted dataFiles
+               for (int i = newDatafiles.size() - 1; i >= 0; i--)
+               {
+                  JournalFile fileToAdd = newDatafiles.get(i);
+                  if (JournalImpl.trace)
+                  {
+                     JournalImpl.trace("Adding file " + fileToAdd + " back as datafile");
+                  }
+                  filesRepository.addDataFileOnTop(fileToAdd);
+               }
+
+               if (JournalImpl.trace)
+               {
+                  JournalImpl.trace("There are " + filesRepository.getDataFilesCount() + " datafiles Now");
+               }
+
+               // Replay pending commands (including updates, deletes and commits)
+
+               for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
+               {
+                  newTransaction.replaceRecordProvider(this);
+               }
+
+               localCompactor.replayPendingCommands();
+
+               // Merge transactions back after compacting.
+               // This has to be done after the replay pending commands, as we need to delete commits
+               // that happened during the compacting
+
+               for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
+               {
+                  if (JournalImpl.trace)
+                  {
+                     JournalImpl.trace("Merging pending transaction " + newTransaction + " after compacting the journal");
+                  }
+                  JournalTransaction liveTransaction = transactions.get(newTransaction.getId());
+                  if (liveTransaction != null)
+                  {
+                     liveTransaction.merge(newTransaction);
+                  }
+                  else
+                  {
+                     HornetQJournalLogger.LOGGER.compactMergeError(newTransaction.getId());
+                  }
+               }
+            }
+            finally
+            {
+               journalLock.writeLock().unlock();
+            }
+
+            // At this point the journal is unlocked. We keep renaming files while the journal is already operational
+            renameFiles(dataFilesToProcess, newDatafiles);
+            deleteControlFile(controlFile);
+
+            HornetQJournalLogger.LOGGER.debug("Finished compacting on journal");
+
+         }
+         finally
+         {
+            // An Exception was probably thrown, and the compactor was not cleared
+            if (compactor != null)
+            {
+               try
+               {
+                  compactor.flush();
+               }
+               catch (Throwable ignored)
+               {
+               }
+
+               compactor = null;
+            }
+            setAutoReclaim(previousReclaimValue);
+         }
+      }
+      finally
+      {
+         compactorLock.writeLock().unlock();
+      }
+
+   }
+
+   /**
+    * <p>Load data accordingly to the record layouts</p>
+    * <p/>
+    * <p>Basic record layout:</p>
+    * <table border=1>
+    * <tr><td><b>Field Name</b></td><td><b>Size</b></td></tr>
+    * <tr><td>RecordType</td><td>Byte (1)</td></tr>
+    * <tr><td>FileID</td><td>Integer (4 bytes)</td></tr>
+    * <tr><td>Compactor Counter</td><td>1 byte</td></tr>
+    * <tr><td>TransactionID <i>(if record is transactional)</i></td><td>Long (8 bytes)</td></tr>
+    * <tr><td>RecordID</td><td>Long (8 bytes)</td></tr>
+    * <tr><td>BodySize(Add, update and delete)</td><td>Integer (4 bytes)</td></tr>
+    * <tr><td>UserDefinedRecordType (If add/update only)</td><td>Byte (1)</td</tr>
+    * <tr><td>RecordBody</td><td>Byte Array (size=BodySize)</td></tr>
+    * <tr><td>Check Size</td><td>Integer (4 bytes)</td></tr>
+    * </table>
+    * <p/>
+    * <p> The check-size is used to validate if the record is valid and complete </p>
+    * <p/>
+    * <p>Commit/Prepare record layout:</p>
+    * <table border=1>
+    * <tr><td><b>Field Name</b></td><td><b>Size</b></td></tr>
+    * <tr><td>RecordType</td><td>Byte (1)</td></tr>
+    * <tr><td>FileID</td><td>Integer (4 bytes)</td></tr>
+    * <tr><td>Compactor Counter</td><td>1 byte</td></tr>
+    * <tr><td>TransactionID <i>(if record is transactional)</i></td><td>Long (8 bytes)</td></tr>
+    * <tr><td>ExtraDataLength (Prepares only)</td><td>Integer (4 bytes)</td></tr>
+    * <tr><td>Number Of Files (N)</td><td>Integer (4 bytes)</td></tr>
+    * <tr><td>ExtraDataBytes</td><td>Bytes (sized by ExtraDataLength)</td></tr>
+    * <tr><td>* FileID(n)</td><td>Integer (4 bytes)</td></tr>
+    * <tr><td>* NumberOfElements(n)</td><td>Integer (4 bytes)</td></tr>
+    * <tr><td>CheckSize</td><td>Integer (4 bytes)</td</tr>
+    * </table>
+    * <p/>
+    * <p> * FileID and NumberOfElements are the transaction summary, and they will be repeated (N)umberOfFiles times </p>
+    */
+   public JournalLoadInformation load(final LoaderCallback loadManager) throws Exception
+   {
+      return load(loadManager, true, null);
+   }
+
+   /**
+    * @param loadManager
+    * @param changeData
+    * @param replicationSync {@code true} will place
+    * @return
+    * @throws Exception
+    */
+   private synchronized JournalLoadInformation load(final LoaderCallback loadManager, final boolean changeData,
+                                                    final JournalState replicationSync) throws Exception
+   {
+      if (state == JournalState.STOPPED || state == JournalState.LOADED)
+      {
+         throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
+                                            state);
+      }
+      if (state == replicationSync)
+      {
+         throw new IllegalStateException("Journal cannot be in state " + JournalState.STARTED);
+      }
+
+      checkControlFile();
+
+      records.clear();
+
+      filesRepository.clear();
+
+      transactions.clear();
+      currentFile = null;
+
+      final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
+
+      final List<JournalFile> orderedFiles = orderFiles();
+
+      filesRepository.calculateNextfileID(orderedFiles);
+
+      int lastDataPos = JournalImpl.SIZE_HEADER;
+
+      // AtomicLong is used only as a reference, not as an Atomic value
+      final AtomicLong maxID = new AtomicLong(-1);
+
+      for (final JournalFile file : orderedFiles)
+      {
+         JournalImpl.trace("Loading file " + file.getFile().getFileName());
+
+         final AtomicBoolean hasData = new AtomicBoolean(false);
+
+         int resultLastPost = JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+         {
+
+            private void checkID(final long id)
+            {
+               if (id > maxID.longValue())
+               {
+                  maxID.set(id);
+               }
+            }
+
+            public void onReadAddRecord(final RecordInfo info) throws Exception
+            {
+               checkID(info.id);
+
+               hasData.set(true);
+
+               loadManager.addRecord(info);
+
+               records.put(info.id, new JournalRecord(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1));
+            }
+
+            public void onReadUpdateRecord(final RecordInfo info) throws Exception
+            {
+               checkID(info.id);
+
+               hasData.set(true);
+
+               loadManager.updateRecord(info);
+
+               JournalRecord posFiles = records.get(info.id);
+
+               if (posFiles != null)
+               {
+                  // It's legal for this to be null. The file(s) with the may
+                  // have been deleted
+                  // just leaving some updates in this file
+
+                  posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact
+                  // count
+               }
+            }
+
+            public void onReadDeleteRecord(final long recordID) throws Exception
+            {
+               hasData.set(true);
+
+               loadManager.deleteRecord(recordID);
+
+               JournalRecord posFiles = records.remove(recordID);
+
+               if (posFiles != null)
+               {
+                  posFiles.delete(file);
+               }
+            }
+
+            public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception
+            {
+               onReadAddRecordTX(transactionID, info);
+            }
+
+            public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
+            {
+
+               checkID(info.id);
+
+               hasData.set(true);
+
+               TransactionHolder tx = loadTransactions.get(transactionID);
+
+               if (tx == null)
+               {
+                  tx = new TransactionHolder(transactionID);
+
+                  loadTransactions.put(transactionID, tx);
+               }
+
+               tx.recordInfos.add(info);
+
+               JournalTransaction tnp = transactions.get(transactionID);
+
+               if (tnp == null)
+               {
+                  tnp = new JournalTransaction(transactionID, JournalImpl.this);
+
+                  transactions.put(transactionID, tnp);
+               }
+
+               tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact
+               // count
+            }
+
+            public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
+            {
+               hasData.set(true);
+
+               TransactionHolder tx = loadTransactions.get(transactionID);
+
+               if (tx == null)
+               {
+                  tx = new TransactionHolder(transactionID);
+
+                  loadTransactions.put(transactionID, tx);
+               }
+
+               tx.recordsToDelete.add(info);
+
+               JournalTransaction tnp = transactions.get(transactionID);
+
+               if (tnp == null)
+               {
+                  tnp = new JournalTransaction(transactionID, JournalImpl.this);
+
+                  transactions.put(transactionID, tnp);
+               }
+
+               tnp.addNegative(file, info.id);
+
+            }
+
+            public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
+            {
+               hasData.set(true);
+
+               TransactionHolder tx = loadTransactions.get(transactionID);
+
+               if (tx == null)
+               {
+                  // The user could choose to prepare empty transactions
+                  tx = new TransactionHolder(transactionID);
+
+                  loadTransactions.put(transactionID, tx);
+               }
+
+               tx.prepared = true;
+
+               tx.extraData = extraData;
+
+               JournalTransaction journalTransaction = transactions.get(transactionID);
+
+               if (journalTransaction == null)
+               {
+                  journalTransaction = new JournalTransaction(transactionID, JournalImpl.this);
+
+                  transactions.put(transactionID, journalTransaction);
+               }
+
+               boolean healthy = checkTransactionHealth(file, journalTransaction, orderedFiles, numberOfRecords);
+
+               if (healthy)
+               {
+                  journalTransaction.prepare(file);
+               }
+               else
+               {
+                  HornetQJournalLogger.LOGGER.preparedTXIncomplete(transactionID);
+                  tx.invalid = true;
+               }
+            }
+
+            public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
+            {
+               TransactionHolder tx = loadTransactions.remove(transactionID);
+
+               // The commit could be alone on its own journal-file and the
+               // whole transaction body was reclaimed but not the
+               // commit-record
+               // So it is completely legal to not find a transaction at this
+               // point
+               // If we can't find it, we assume the TX was reclaimed and we
+               // ignore this
+               if (tx != null)
+               {
+                  JournalTransaction journalTransaction = transactions.remove(transactionID);
+
+                  if (journalTransaction == null)
+                  {
+                     throw new IllegalStateException("Cannot find tx " + transactionID);
+                  }
+
+                  boolean healthy = checkTransactionHealth(file, journalTransaction, orderedFiles, numberOfRecords);
+
+                  if (healthy)
+                  {
+                     for (RecordInfo txRecord : tx.recordInfos)
+                     {
+                        if (txRecord.isUpdate)
+                        {
+                           loadManager.updateRecord(txRecord);
+                        }
+                        else
+                        {
+                           loadManager.addRecord(txRecord);
+                        }
+                     }
+
+                     for (RecordInfo deleteValue : tx.recordsToDelete)
+                     {
+                        loadManager.deleteRecord(deleteValue.id);
+                     }
+
+                     journalTransaction.commit(file);
+                  }
+                  else
+                  {
+                     HornetQJournalLogger.LOGGER.txMissingElements(transactionID);
+
+                     journalTransaction.forget();
+                  }
+
+                  hasData.set(true);
+               }
+
+            }
+
+            public void onReadRollbackRecord(final long transactionID) throws Exception
+            {
+               TransactionHolder tx = loadTransactions.remove(transactionID);
+
+               // The rollback could be alone on its own journal-file and the
+               // whole transaction body was reclaimed but the commit-record
+               // So it is completely legal to not find a transaction at this
+               // point
+               if (tx != null)
+               {
+                  JournalTransaction tnp = transactions.remove(transactionID);
+
+                  if (tnp == null)
+                  {
+                     throw new IllegalStateException("Cannot find tx " + transactionID);
+                  }
+
+                  // There is no need to validate summaries/holes on
+                  // Rollbacks.. We will ignore the data anyway.
+                  tnp.rollback(file);
+
+                  hasData.set(true);
+               }
+            }
+
+            public void markAsDataFile(final JournalFile file)
+            {
+               hasData.set(true);
+            }
+
+         });
+
+         if (hasData.get())
+         {
+            lastDataPos = resultLastPost;
+            filesRepository.addDataFileOnBottom(file);
+         }
+         else
+         {
+            if (changeData)
+            {
+               // Empty dataFiles with no data
+               filesRepository.addFreeFile(file, false, false);
+            }
+         }
+      }
+
+      if (replicationSync == JournalState.SYNCING)
+      {
+         assert filesRepository.getDataFiles().isEmpty();
+         setJournalState(JournalState.SYNCING);
+         return new JournalLoadInformation(0, -1);
+      }
+
+      setUpCurrentFile(lastDataPos);
+
+      setJournalState(JournalState.LOADED);
+
+      for (TransactionHolder transaction : loadTransactions.values())
+      {
+         if ((!transaction.prepared || transaction.invalid) && replicationSync != JournalState.SYNCING_UP_TO_DATE)
+         {
+            HornetQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID);
+
+            if (changeData)
+            {
+               // I append a rollback record here, because otherwise compacting will be throwing messages because of unknown transactions
+               this.appendRollbackRecord(transaction.transactionID, false);
+            }
+
+            loadManager.failedTransaction(transaction.transactionID,
+                                          transaction.recordInfos,
+                                          transaction.recordsToDelete);
+         }
+         else
+         {
+            for (RecordInfo info : transaction.recordInfos)
+            {
+               if (info.id > maxID.get())
+               {
+                  maxID.set(info.id);
+               }
+            }
+
+            PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
+
+            info.records.addAll(transaction.recordInfos);
+
+            info.recordsToDelete.addAll(transaction.recordsToDelete);
+
+            loadManager.addPreparedTransaction(info);
+         }
+      }
+
+      checkReclaimStatus();
+
+      return new JournalLoadInformation(records.size(), maxID.longValue());
+   }
+
+   /**
+    * @return true if cleanup was called
+    */
+   public final boolean checkReclaimStatus() throws Exception
+   {
+
+      if (compactorRunning.get())
+      {
+         return false;
+      }
+
+      // We can't start reclaim while compacting is working
+      while (true)
+      {
+         if (state != JournalImpl.JournalState.LOADED)
+            return false;
+         if (!isAutoReclaim())
+            return false;
+         if (journalLock.readLock().tryLock(250, TimeUnit.MILLISECONDS))
+            break;
+      }
+      try
+      {
+         reclaimer.scan(getDataFiles());
+
+         for (JournalFile file : filesRepository.getDataFiles())
+         {
+            if (file.isCanReclaim())
+            {
+               // File can be reclaimed or deleted
+               if (JournalImpl.trace)
+               {
+                  JournalImpl.trace("Reclaiming file " + file);
+               }
+
+               filesRepository.removeDataFile(file);
+
+               filesRepository.addFreeFile(file, false);
+            }
+         }
+      }
+      finally
+      {
+         journalLock.readLock().unlock();
+      }
+
+      return false;
+   }
+
+   private boolean needsCompact() throws Exception
+   {
+      JournalFile[] dataFiles = getDataFiles();
+
+      long totalLiveSize = 0;
+
+      for (JournalFile file : dataFiles)
+      {
+         totalLiveSize += file.getLiveSize();
+      }
+
+      long totalBytes = dataFiles.length * (long) fileSize;
+
+      long compactMargin = (long) (totalBytes * compactPercentage);
+
+      boolean needCompact = totalLiveSize < compactMargin && dataFiles.length > compactMinFiles;
+
+      return needCompact;
+
+   }
+
+   private void checkCompact() throws Exception
+   {
+      if (compactMinFiles == 0)
+      {
+         // compacting is disabled
+         return;
+      }
+
+      if (state != JournalState.LOADED)
+      {
+         return;
+      }
+
+      if (!compactorRunning.get() && needsCompact())
+      {
+         scheduleCompact();
+      }
+   }
+
+   private void scheduleCompact()
+   {
+      if (!compactorRunning.compareAndSet(false, true))
+      {
+         return;
+      }
+
+      // We can't use the executor for the compacting... or we would dead lock because of file open and creation
+      // operations (that will use the executor)
+      compactorExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
+
+            try
+            {
+               JournalImpl.this.compact();
+            }
+            catch (Throwable e)
+            {
+               HornetQJournalLogger.LOGGER.errorCompacting(e);
+            }
+            finally
+            {
+               compactorRunning.set(false);
+            }
+         }
+      });
+   }
+
+   // TestableJournal implementation
+   // --------------------------------------------------------------
+
+   public final void setAutoReclaim(final boolean autoReclaim)
+   {
+      this.autoReclaim = autoReclaim;
+   }
+
+   public final boolean isAutoReclaim()
+   {
+      return autoReclaim;
+   }
+
+   /* Only meant to be used in tests. */
+   @Override
+   public String debug() throws Exception
+   {
+      reclaimer.scan(getDataFiles());
+
+      StringBuilder builder = new StringBuilder();
+
+      for (JournalFile file : filesRepository.getDataFiles())
+      {
+         builder.append("DataFile:" + file +
+                           " posCounter = " +
+                           file.getPosCount() +
+                           " reclaimStatus = " +
+                           file.isCanReclaim() +
+                           " live size = " +
+                           file.getLiveSize() +
+                           "\n");
+         if (file instanceof JournalFileImpl)
+         {
+            builder.append(((JournalFileImpl) file).debug());
+
+         }
+      }
+
+      for (JournalFile file : filesRepository.getFreeFiles())
+      {
+         builder.append("FreeFile:" + file + "\n");
+      }
+
+      if (currentFile != null)
+      {
+         builder.append("CurrentFile:" + currentFile + " posCounter = " + currentFile.getPosCount() + "\n");
+
+         if (currentFile instanceof JournalFileImpl)
+         {
+            builder.append(((JournalFileImpl) currentFile).debug());
+         }
+      }
+      else
+      {
+         builder.append("CurrentFile: No current file at this point!");
+      }
+
+      return builder.toString();
+   }
+
+   /**
+    * Method for use on testcases.
+    * It will call waitComplete on every transaction, so any assertions on the file system will be correct after this
+    */
+   public void debugWait() throws InterruptedException
+   {
+      fileFactory.flush();
+
+      for (JournalTransaction tx : transactions.values())
+      {
+         tx.waitCallbacks();
+      }
+
+      if (filesExecutor != null && !filesExecutor.isShutdown())
+      {
+         // Send something to the closingExecutor, just to make sure we went
+         // until its end
+         final CountDownLatch latch = newLatch(1);
+
+         filesExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               latch.countDown();
+            }
+         });
+
+         awaitLatch(latch, -1);
+      }
+
+   }
+
+   public int getDataFilesCount()
+   {
+      return filesRepository.getDataFilesCount();
+   }
+
+   public JournalFile[] getDataFiles()
+   {
+      return filesRepository.getDataFilesArray();
+   }
+
+   public int getFreeFilesCount()
+   {
+      return filesRepository.getFreeFilesCount();
+   }
+
+   public int getOpenedFilesCount()
+   {
+      return filesRepository.getOpenedFilesCount();
+   }
+
+   public int getIDMapSize()
+   {
+      return records.size();
+   }
+
+   @Override
+   public int getFileSize()
+   {
+      return fileSize;
+   }
+
+   public int getMinFiles()
+   {
+      return minFiles;
+   }
+
+   public String getFilePrefix()
+   {
+      return filesRepository.getFilePrefix();
+   }
+
+   public String getFileExtension()
+   {
+      return filesRepository.getFileExtension();
+   }
+
+   public int getMaxAIO()
+   {
+      return filesRepository.getMaxAIO();
+   }
+
+   public int getUserVersion()
+   {
+      return userVersion;
+   }
+
+   // In some tests we need to force the journal to move to a next file
+   public void forceMoveNextFile() throws Exception
+   {
+      journalLock.readLock().lock();
+      try
+      {
+         lockAppend.lock();
+         try
+         {
+            moveNextFile(false);
+            debugWait();
+         }
+         finally
+         {
+            lockAppend.unlock();
+         }
+      }
+      finally
+      {
+         journalLock.readLock().unlock();
+      }
+   }
+
+   public void perfBlast(final int pages)
+   {
+      new PerfBlast(pages).start();
+   }
+
+   // HornetQComponent implementation
+   // ---------------------------------------------------
+
+   public synchronized boolean isStarted()
+   {
+      return state != JournalState.STOPPED;
+   }
+
+   public synchronized void start()
+   {
+      if (state != JournalState.STOPPED)
+      {
+         throw new IllegalStateException("Journal " + this + " is not stopped, state is " + state);
+      }
+
+      filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
+      {
+
+         public Thread newThread(final Runnable r)
+         {
+            return new Thread(r, "JournalImpl::FilesExecutor");
+         }
+      });
+
+      compactorExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
+      {
+
+         public Thread newThread(final Runnable r)
+         {
+            return new Thread(r, "JournalImpl::CompactorExecutor");
+         }
+      });
+
+      filesRepository.setExecutor(filesExecutor);
+
+      fileFactory.start();
+
+      setJournalState(JournalState.STARTED);
+   }
+
+   public synchronized void stop() throws Exception
+   {
+      if (state == JournalState.STOPPED)
+      {
+         throw new IllegalStateException("Journal is already stopped");
+      }
+
+
+      journalLock.writeLock().lock();
+      try
+      {
+         lockAppend.lock();
+
+         try
+         {
+
+            setJournalState(JournalState.STOPPED);
+
+            compactorExecutor.shutdown();
+
+            if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS))
+            {
+               HornetQJournalLogger.LOGGER.couldNotStopCompactor();
+            }
+
+            filesExecutor.shutdown();
+
+            filesRepository.setExecutor(null);
+
+            if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
+            {
+               HornetQJournalLogger.LOGGER.couldNotStopJournalExecutor();
+            }
+
+            try
+            {
+               for (CountDownLatch latch : latches)
+               {
+                  latch.countDown();
+               }
+            }
+            catch (Throwable e)
+            {
+               HornetQJournalLogger.LOGGER.warn(e.getMessage(), e);
+            }
+
+            fileFactory.deactivateBuffer();
+
+            if (currentFile != null && currentFile.getFile().isOpen())
+            {
+               currentFile.getFile().close();
+            }
+
+            filesRepository.clear();
+
+            fileFactory.stop();
+
+            currentFile = null;
+         }
+         finally
+         {
+            lockAppend.unlock();
+         }
+      }
+      finally
+      {
+         journalLock.writeLock().unlock();
+      }
+   }
+
+   public int getNumberOfRecords()
+   {
+      return records.size();
+   }
+
+
+   protected SequentialFile createControlFile(final List<JournalFile> files,
+                                              final List<JournalFile> newFiles,
+                                              final Pair<String, String> cleanupRename) throws Exception
+   {
+      ArrayList<Pair<String, String>> cleanupList;
+      if (cleanupRename == null)
+      {
+         cleanupList = null;
+      }
+      else
+      {
+         cleanupList = new ArrayList<Pair<String, String>>();
+         cleanupList.add(cleanupRename);
+      }
+      return AbstractJournalUpdateTask.writeControlFile(fileFactory, files, newFiles, cleanupList);
+   }
+
+   protected void deleteControlFile(final SequentialFile controlFile) throws Exception
+   {
+      controlFile.delete();
+   }
+
+   /**
+    * being protected as testcases can override this method
+    */
+   protected void renameFiles(final List<JournalFile> oldFiles, final List<JournalFile> newFiles) throws Exception
+   {
+
+      // addFreeFiles has to be called through filesExecutor, or the fileID on the orderedFiles may end up in a wrong
+      // order
+      // These files are already freed, and are described on the compactor file control.
+      // In case of crash they will be cleared anyways
+
+      final CountDownLatch done = newLatch(1);
+
+      filesExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               for (JournalFile file : oldFiles)
+               {
+                  try
+                  {
+                     filesRepository.addFreeFile(file, false);
+                  }
+                  catch (Throwable e)
+                  {
+                     HornetQJournalLogger.LOGGER.errorReinitializingFile(e, file);
+                  }
+               }
+            }
+            finally
+            {
+               done.countDown();
+            }
+         }
+      });
+
+      // need to wait all old files to be freed
+      // to avoid a race where the CTR file is deleted before the init for these files is already done
+      // what could cause a duplicate in case of a crash after the CTR is deleted and before the file is initialized
+      awaitLatch(done, -1);
+
+      for (JournalFile file : newFiles)
+      {
+         String newName = JournalImpl.renameExtensionFile(file.getFile().getFileName(), ".cmp");
+         file.getFile().renameTo(newName);
+      }
+
+   }
+
+   /**
+    * @param name
+    * @return
+    */
+   protected static String renameExtensionFile(String name, final String extension)
+   {
+      name = name.substring(0, name.lastIndexOf(extension));
+      return name;
+   }
+
+   /**
+    * This is an interception point for testcases, when the compacted files are written, before replacing the data structures
+    */
+   protected void onCompactStart() throws Exception
+   {
+   }
+
+   /**
+    * This is an interception point for testcases, when the compacted files are written, to be called
+    * as soon as the compactor gets a writeLock
+    */
+   protected void onCompactLockingTheJournal() throws Exception
+   {
+   }
+
+   /**
+    * This is an interception point for testcases, when the compacted files are written, before replacing the data structures
+    */
+   protected void onCompactDone()
+   {
+   }
+
+   // Private
+   // -----------------------------------------------------------------------------
+
+   /**
+    * <p/>
+    * Checks for holes on the transaction (a commit written but with an incomplete transaction).
+    * <p/>
+    * This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the
+    * COMMIT-RECORD.
+    * <p/>
+    * For details see {@link JournalCompleteRecordTX} about how the transaction-summary is recorded.
+    *
+    * @param journalTransaction
+    * @param orderedFiles
+    * @param numberOfRecords
+    * @return
+    */
+   private boolean checkTransactionHealth(final JournalFile currentFile,
+                                          final JournalTransaction journalTransaction,
+                                          final List<JournalFile> orderedFiles,
+                                          final int numberOfRecords)
+   {
+      return journalTransaction.getCounter(currentFile) == numberOfRecords;
+   }
+
+   private static boolean isTransaction(final byte recordType)
+   {
+      return recordType == JournalImpl.ADD_RECORD_TX || recordType == JournalImpl.UPDATE_RECORD_TX ||
+         recordType == JournalImpl.DELETE_RECORD_TX ||
+         JournalImpl.isCompleteTransaction(recordType);
+   }
+
+   private static boolean isCompleteTransaction(final byte recordType)
+   {
+      return recordType == JournalImpl.COMMIT_RECORD || recordType == JournalImpl.PREPARE_RECORD ||
+         recordType == JournalImpl.ROLLBACK_RECORD;
+   }
+
+   private static boolean isContainsBody(final byte recordType)
+   {
+      return recordType >= JournalImpl.ADD_RECORD && recordType <= JournalImpl.DELETE_RECORD_TX;
+   }
+
+   private static int getRecordSize(final byte recordType, final int journalVersion)
+   {
+      // The record size (without the variable portion)
+      int recordSize = 0;
+      switch (recordType)
+      {
+         case ADD_RECORD:
+            recordSize = JournalImpl.SIZE_ADD_RECORD;
+            break;
+         case UPDATE_RECORD:
+            recordSize = JournalImpl.SIZE_ADD_RECORD;
+            break;
+         case ADD_RECORD_TX:
+            recordSize = JournalImpl.SIZE_ADD_RECORD_TX;
+            break;
+         case UPDATE_RECORD_TX:
+            recordSize = JournalImpl.SIZE_ADD_RECORD_TX;
+            break;
+         case DELETE_RECORD:
+            recordSize = JournalImpl.SIZE_DELETE_RECORD;
+            break;
+         case DELETE_RECORD_TX:
+            recordSize = JournalImpl.SIZE_DELETE_RECORD_TX;
+            break;
+         case PREPARE_RECORD:
+            recordSize = JournalImpl.SIZE_PREPARE_RECORD;
+            break;
+         case COMMIT_RECORD:
+            recordSize = JournalImpl.SIZE_COMMIT_RECORD;
+            break;
+         case ROLLBACK_RECORD:
+            recordSize = JournalImpl.SIZE_ROLLBACK_RECORD;
+            break;
+         default:
+            // Sanity check, this was previously tested, nothing different
+            // should be on this switch
+            throw new IllegalStateException("Record other than expected");
+
+      }
+      if (journalVersion >= 2)
+      {
+         return recordSize + 1;
+      }
+      else
+      {
+         return recordSize;
+      }
+   }
+
+   /**
+    * @param file
+    * @return
+    * @throws Exception
+    */
+   private JournalFileImpl readFileHeader(final SequentialFile file) throws Exception
+   {
+      ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
+
+      file.read(bb);
+
+      int journalVersion = bb.getInt();
+
+      if (journalVersion != JournalImpl.FORMAT_VERSION)
+      {
+         boolean isCompatible = false;
+
+         for (int v : JournalImpl.COMPATIBLE_VERSIONS)
+         {
+            if (v == journalVersion)
+            {
+               isCompatible = true;
+            }
+         }
+
+         if (!isCompatible)
+         {
+            throw HornetQJournalBundle.BUNDLE.journalFileMisMatch();
+         }
+      }
+
+      int readUserVersion = bb.getInt();
+
+      if (readUserVersion != userVersion)
+      {
+         throw HornetQJournalBundle.BUNDLE.journalDifferentVersion();
+      }
+
+      long fileID = bb.getLong();
+
+      fileFactory.releaseBuffer(bb);
+
+      bb = null;
+
+      return new JournalFileImpl(file, fileID, journalVersion);
+   }
+
+   /**
+    * @param fileID
+    * @param sequentialFile
+    * @throws Exception
+    */
+   public static int initFileHeader(final SequentialFileFactory fileFactory,
+                                    final SequentialFile sequentialFile,
+                                    final int userVersion,
+                                    final long fileID) throws Exception
+   {
+      // We don't need to release buffers while writing.
+      ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
+
+      HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bb);
+
+      try
+      {
+         JournalImpl.writeHeader(buffer, userVersion, fileID);
+
+         bb.rewind();
+
+         int bufferSize = bb.limit()

<TRUNCATED>

Mime
View raw message