activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [08/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:41:38 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalCompactor.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalCompactor.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalCompactor.java
new file mode 100644
index 0000000..4fa1020
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalCompactor.java
@@ -0,0 +1,639 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQBuffers;
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.core.journal.RecordInfo;
+import org.apache.activemq6.core.journal.SequentialFile;
+import org.apache.activemq6.core.journal.SequentialFileFactory;
+import org.apache.activemq6.core.journal.impl.dataformat.ByteArrayEncoding;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalAddRecord;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalAddRecordTX;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalCompleteRecordTX;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalCompleteRecordTX.TX_RECORD_TYPE;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalDeleteRecordTX;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalInternalRecord;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalRollbackRecordTX;
+import org.apache.activemq6.journal.HornetQJournalLogger;
+
+/**
+ * A JournalCompactor
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalCompactor extends AbstractJournalUpdateTask implements JournalRecordProvider
+{
+   // We try to separate old record from new ones when doing the compacting
+   // this is a split line
+   // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE
+   private static final short COMPACT_SPLIT_LINE = 2;
+
+   // Snapshot of transactions that were pending when the compactor started
+   private final Map<Long, PendingTransaction> pendingTransactions = new ConcurrentHashMap<Long, PendingTransaction>();
+
+   private final Map<Long, JournalRecord> newRecords = new HashMap<Long, JournalRecord>();
+
+   private final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
+
+   /** Commands that happened during compacting
+    *  We can't process any counts during compacting, as we won't know in what files the records are taking place, so
+    *  we cache those updates. As soon as we are done we take the right account. */
+   private final LinkedList<CompactCommand> pendingCommands = new LinkedList<CompactCommand>();
+
+   public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
+                                                final List<String> dataFiles,
+                                                final List<String> newFiles,
+                                                final List<Pair<String, String>> renameFile) throws Exception
+   {
+      SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL, 1);
+
+      if (controlFile.exists())
+      {
+         JournalFile file = new JournalFileImpl(controlFile, 0, JournalImpl.FORMAT_VERSION);
+
+         final ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
+
+         JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract()
+         {
+            @Override
+            public void onReadAddRecord(final RecordInfo info) throws Exception
+            {
+               records.add(info);
+            }
+         });
+
+         if (records.size() == 0)
+         {
+            return null;
+         }
+         else
+         {
+            HornetQBuffer input = HornetQBuffers.wrappedBuffer(records.get(0).data);
+
+            int numberDataFiles = input.readInt();
+
+            for (int i = 0; i < numberDataFiles; i++)
+            {
+               dataFiles.add(input.readUTF());
+            }
+
+            int numberNewFiles = input.readInt();
+
+            for (int i = 0; i < numberNewFiles; i++)
+            {
+               newFiles.add(input.readUTF());
+            }
+
+            int numberRenames = input.readInt();
+            for (int i = 0; i < numberRenames; i++)
+            {
+               String from = input.readUTF();
+               String to = input.readUTF();
+               renameFile.add(new Pair<String, String>(from, to));
+            }
+
+         }
+
+         return controlFile;
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   public List<JournalFile> getNewDataFiles()
+   {
+      return newDataFiles;
+   }
+
+   public Map<Long, JournalRecord> getNewRecords()
+   {
+      return newRecords;
+   }
+
+   public Map<Long, JournalTransaction> getNewTransactions()
+   {
+      return newTransactions;
+   }
+
+   public JournalCompactor(final SequentialFileFactory fileFactory,
+                           final JournalImpl journal,
+                           final JournalFilesRepository filesRepository,
+                           final Set<Long> recordsSnapshot,
+                           final long firstFileID)
+   {
+      super(fileFactory, journal, filesRepository, recordsSnapshot, firstFileID);
+   }
+
+   /** This methods informs the Compactor about the existence of a pending (non committed) transaction */
+   public void addPendingTransaction(final long transactionID, final long[] ids)
+   {
+      pendingTransactions.put(transactionID, new PendingTransaction(ids));
+   }
+
+   public void addCommandCommit(final JournalTransaction liveTransaction, final JournalFile currentFile)
+   {
+      pendingCommands.add(new CommitCompactCommand(liveTransaction, currentFile));
+
+      long[] ids = liveTransaction.getPositiveArray();
+
+      PendingTransaction oldTransaction = pendingTransactions.get(liveTransaction.getId());
+      long[] ids2 = null;
+
+      if (oldTransaction != null)
+      {
+         ids2 = oldTransaction.pendingIDs;
+      }
+
+      /** If a delete comes for these records, while the compactor still working, we need to be able to take them into account for later deletes
+       *  instead of throwing exceptions about non existent records */
+      if (ids != null)
+      {
+         for (long id : ids)
+         {
+            addToRecordsSnaptshot(id);
+         }
+      }
+
+      if (ids2 != null)
+      {
+         for (long id : ids2)
+         {
+            addToRecordsSnaptshot(id);
+         }
+      }
+   }
+
+   public void addCommandRollback(final JournalTransaction liveTransaction, final JournalFile currentFile)
+   {
+      pendingCommands.add(new RollbackCompactCommand(liveTransaction, currentFile));
+   }
+
+   /**
+    * @param id
+    * @param usedFile
+    */
+   public void addCommandDelete(final long id, final JournalFile usedFile)
+   {
+      pendingCommands.add(new DeleteCompactCommand(id, usedFile));
+   }
+
+   /**
+    * @param id
+    * @param usedFile
+    */
+   public void addCommandUpdate(final long id, final JournalFile usedFile, final int size)
+   {
+      pendingCommands.add(new UpdateCompactCommand(id, usedFile, size));
+   }
+
+   private void checkSize(final int size) throws Exception
+   {
+      checkSize(size, -1);
+   }
+
+   private void checkSize(final int size, final int compactCount) throws Exception
+   {
+      if (getWritingChannel() == null)
+      {
+         if (!checkCompact(compactCount))
+         {
+            // will need to open a file either way
+            openFile();
+         }
+      }
+      else
+      {
+         if (compactCount >= 0)
+         {
+            if (checkCompact(compactCount))
+            {
+               // The file was already moved on this case, no need to check for the size.
+               // otherwise we will also need to check for the size
+               return;
+            }
+         }
+
+         if (getWritingChannel().writerIndex() + size > getWritingChannel().capacity())
+         {
+            openFile();
+         }
+      }
+   }
+
+   int currentCount;
+
+   // This means we will need to split when the compactCount is bellow the watermark
+   boolean willNeedToSplit = false;
+
+   boolean splitted = false;
+
+   private boolean checkCompact(final int compactCount) throws Exception
+   {
+      if (compactCount >= COMPACT_SPLIT_LINE && !splitted)
+      {
+         willNeedToSplit = true;
+      }
+
+      if (willNeedToSplit && compactCount < COMPACT_SPLIT_LINE)
+      {
+         willNeedToSplit = false;
+         splitted = false;
+         openFile();
+         return true;
+      }
+      else
+      {
+         return false;
+      }
+   }
+
+   /**
+    * Replay pending counts that happened during compacting
+    */
+   public void replayPendingCommands()
+   {
+      for (CompactCommand command : pendingCommands)
+      {
+         try
+         {
+            command.execute();
+         }
+         catch (Exception e)
+         {
+            HornetQJournalLogger.LOGGER.errorReplayingCommands(e);
+         }
+      }
+
+      pendingCommands.clear();
+   }
+
+   // JournalReaderCallback implementation -------------------------------------------
+
+   public void onReadAddRecord(final RecordInfo info) throws Exception
+   {
+      if (lookupRecord(info.id))
+      {
+         JournalInternalRecord addRecord = new JournalAddRecord(true,
+                                                                info.id,
+                                                                info.getUserRecordType(),
+                                                                new ByteArrayEncoding(info.data));
+         addRecord.setCompactCount((short)(info.compactCount + 1));
+
+         checkSize(addRecord.getEncodeSize(), info.compactCount);
+
+         writeEncoder(addRecord);
+
+         newRecords.put(info.id, new JournalRecord(currentFile, addRecord.getEncodeSize()));
+      }
+   }
+
+   public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
+   {
+      if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id))
+      {
+         JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+         JournalInternalRecord record = new JournalAddRecordTX(true,
+                                                               transactionID,
+                                                               info.id,
+                                                               info.getUserRecordType(),
+                                                               new ByteArrayEncoding(info.data));
+
+         record.setCompactCount((short)(info.compactCount + 1));
+
+         checkSize(record.getEncodeSize(), info.compactCount);
+
+         newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
+
+         writeEncoder(record);
+      }
+   }
+
+   public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
+   {
+
+      if (pendingTransactions.get(transactionID) != null)
+      {
+         // Sanity check, this should never happen
+         HornetQJournalLogger.LOGGER.inconsistencyDuringCompacting(transactionID);
+      }
+      else
+      {
+         JournalTransaction newTransaction = newTransactions.remove(transactionID);
+         if (newTransaction != null)
+         {
+            JournalInternalRecord commitRecord =
+                     new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, transactionID, null);
+
+            checkSize(commitRecord.getEncodeSize());
+
+            writeEncoder(commitRecord, newTransaction.getCounter(currentFile));
+
+            newTransaction.commit(currentFile);
+         }
+      }
+   }
+
+   public void onReadDeleteRecord(final long recordID) throws Exception
+   {
+      if (newRecords.get(recordID) != null)
+      {
+         // Sanity check, it should never happen
+         HornetQJournalLogger.LOGGER.inconsistencyDuringCompactingDelete(recordID);
+      }
+
+   }
+
+   public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
+   {
+      if (pendingTransactions.get(transactionID) != null)
+      {
+         JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+         JournalInternalRecord record = new JournalDeleteRecordTX(transactionID,
+                                                                  info.id,
+                                                                  new ByteArrayEncoding(info.data));
+
+         checkSize(record.getEncodeSize());
+
+         writeEncoder(record);
+
+         newTransaction.addNegative(currentFile, info.id);
+      }
+      // else.. nothing to be done
+   }
+
+   public void markAsDataFile(final JournalFile file)
+   {
+      // nothing to be done here
+   }
+
+   public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
+   {
+      if (pendingTransactions.get(transactionID) != null)
+      {
+
+         JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+         JournalInternalRecord prepareRecord =
+                  new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, transactionID, new ByteArrayEncoding(extraData));
+
+         checkSize(prepareRecord.getEncodeSize());
+
+         writeEncoder(prepareRecord, newTransaction.getCounter(currentFile));
+
+         newTransaction.prepare(currentFile);
+
+      }
+   }
+
+   public void onReadRollbackRecord(final long transactionID) throws Exception
+   {
+      if (pendingTransactions.get(transactionID) != null)
+      {
+         // Sanity check, this should never happen
+         throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
+                                         " for an already rolled back transaction during compacting");
+      }
+      else
+      {
+         JournalTransaction newTransaction = newTransactions.remove(transactionID);
+         if (newTransaction != null)
+         {
+
+            JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(transactionID);
+
+            checkSize(rollbackRecord.getEncodeSize());
+
+            writeEncoder(rollbackRecord);
+
+            newTransaction.rollback(currentFile);
+         }
+
+      }
+   }
+
+   public void onReadUpdateRecord(final RecordInfo info) throws Exception
+   {
+      if (lookupRecord(info.id))
+      {
+         JournalInternalRecord updateRecord = new JournalAddRecord(false,
+                                                                   info.id,
+                                                                   info.userRecordType,
+                                                                   new ByteArrayEncoding(info.data));
+
+         updateRecord.setCompactCount((short)(info.compactCount + 1));
+
+         checkSize(updateRecord.getEncodeSize(), info.compactCount);
+
+         JournalRecord newRecord = newRecords.get(info.id);
+
+         if (newRecord == null)
+         {
+            HornetQJournalLogger.LOGGER.compactingWithNoAddRecord(info.id);
+         }
+         else
+         {
+            newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize());
+         }
+
+         writeEncoder(updateRecord);
+      }
+   }
+
+   public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception
+   {
+      if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id))
+      {
+         JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+         JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false,
+                                                                       transactionID,
+                                                                       info.id,
+                                                                       info.userRecordType,
+                                                                       new ByteArrayEncoding(info.data));
+
+         updateRecordTX.setCompactCount((short)(info.compactCount + 1));
+
+         checkSize(updateRecordTX.getEncodeSize(), info.compactCount);
+
+         writeEncoder(updateRecordTX);
+
+         newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize());
+      }
+      else
+      {
+         onReadUpdateRecord(info);
+      }
+   }
+
+   /**
+    * @param transactionID
+    * @return
+    */
+   private JournalTransaction getNewJournalTransaction(final long transactionID)
+   {
+      JournalTransaction newTransaction = newTransactions.get(transactionID);
+      if (newTransaction == null)
+      {
+         newTransaction = new JournalTransaction(transactionID, this);
+         newTransactions.put(transactionID, newTransaction);
+      }
+      return newTransaction;
+   }
+
+   private abstract static class CompactCommand
+   {
+      abstract void execute() throws Exception;
+   }
+
+   private class DeleteCompactCommand extends CompactCommand
+   {
+      long id;
+
+      JournalFile usedFile;
+
+      public DeleteCompactCommand(final long id, final JournalFile usedFile)
+      {
+         this.id = id;
+         this.usedFile = usedFile;
+      }
+
+      @Override
+      void execute() throws Exception
+      {
+         JournalRecord deleteRecord = journal.getRecords().remove(id);
+         if (deleteRecord == null)
+         {
+            HornetQJournalLogger.LOGGER.noRecordDuringCompactReplay(id);
+         }
+         else
+         {
+            deleteRecord.delete(usedFile);
+         }
+      }
+   }
+
+   private static class PendingTransaction
+   {
+      long[] pendingIDs;
+
+      PendingTransaction(final long[] ids)
+      {
+         pendingIDs = ids;
+      }
+
+   }
+
+   private class UpdateCompactCommand extends CompactCommand
+   {
+      private final long id;
+
+      private final JournalFile usedFile;
+
+      private final int size;
+
+      public UpdateCompactCommand(final long id, final JournalFile usedFile, final int size)
+      {
+         this.id = id;
+         this.usedFile = usedFile;
+         this.size = size;
+      }
+
+      @Override
+      void execute() throws Exception
+      {
+         JournalRecord updateRecord = journal.getRecords().get(id);
+         updateRecord.addUpdateFile(usedFile, size);
+      }
+   }
+
+   private class CommitCompactCommand extends CompactCommand
+   {
+      private final JournalTransaction liveTransaction;
+
+      /** File containing the commit record */
+      private final JournalFile commitFile;
+
+      public CommitCompactCommand(final JournalTransaction liveTransaction, final JournalFile commitFile)
+      {
+         this.liveTransaction = liveTransaction;
+         this.commitFile = commitFile;
+      }
+
+      @Override
+      void execute() throws Exception
+      {
+         JournalTransaction newTransaction = newTransactions.get(liveTransaction.getId());
+         if (newTransaction != null)
+         {
+            liveTransaction.merge(newTransaction);
+            liveTransaction.commit(commitFile);
+         }
+         newTransactions.remove(liveTransaction.getId());
+      }
+   }
+
+   private class RollbackCompactCommand extends CompactCommand
+   {
+      private final JournalTransaction liveTransaction;
+
+      /** File containing the commit record */
+      private final JournalFile rollbackFile;
+
+      public RollbackCompactCommand(final JournalTransaction liveTransaction, final JournalFile rollbackFile)
+      {
+         this.liveTransaction = liveTransaction;
+         this.rollbackFile = rollbackFile;
+      }
+
+      @Override
+      void execute() throws Exception
+      {
+         JournalTransaction newTransaction = newTransactions.get(liveTransaction.getId());
+         if (newTransaction != null)
+         {
+            liveTransaction.merge(newTransaction);
+            liveTransaction.rollback(rollbackFile);
+         }
+         newTransactions.remove(liveTransaction.getId());
+      }
+   }
+
+   @Override
+   public JournalCompactor getCompactor()
+   {
+      return null;
+   }
+
+   @Override
+   public Map<Long, JournalRecord> getRecords()
+   {
+      return newRecords;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalConstants.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalConstants.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalConstants.java
new file mode 100644
index 0000000..1b5ab65
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+public final class JournalConstants
+{
+
+   public static final int DEFAULT_JOURNAL_BUFFER_SIZE_AIO = 490 * 1024;
+   public static final int DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO = (int)(1000000000d / 2000);
+   public static final int DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO = (int)(1000000000d / 300);
+   public static final int DEFAULT_JOURNAL_BUFFER_SIZE_NIO = 490 * 1024;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFile.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFile.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFile.java
new file mode 100644
index 0000000..87f3dbe
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFile.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import org.apache.activemq6.core.journal.SequentialFile;
+
+/**
+ *
+ * A JournalFile
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public interface JournalFile
+{
+   int getNegCount(JournalFile file);
+
+   void incNegCount(JournalFile file);
+
+   int getPosCount();
+
+   void incPosCount();
+
+   void decPosCount();
+
+   void addSize(int bytes);
+
+   void decSize(int bytes);
+
+   int getLiveSize();
+
+   /** The total number of deletes this file has */
+   int getTotalNegativeToOthers();
+
+   /**
+    * Whether this file's contents can deleted and the file reused.
+    * @param canDelete if {@code true} then this file's contents are unimportant and may be deleted
+    *           at any time.
+    */
+   void setCanReclaim(boolean canDelete);
+
+   /**
+    * Whether this file's contents can deleted and the file reused.
+    * @return {@code true} if the file can already be deleted.
+    */
+   boolean isCanReclaim();
+
+   /** This is a field to identify that records on this file actually belong to the current file.
+    *  The possible implementation for this is fileID & Integer.MAX_VALUE */
+   int getRecordID();
+
+   long getFileID();
+
+   int getJournalVersion();
+
+   SequentialFile getFile();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFileImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFileImpl.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFileImpl.java
new file mode 100644
index 0000000..8aeee12
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFileImpl.java
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq6.core.journal.SequentialFile;
+
+/**
+ *
+ * A JournalFileImpl
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ */
+public class JournalFileImpl implements JournalFile
+{
+   private final SequentialFile file;
+
+   private final long fileID;
+
+   private final int recordID;
+
+   private long offset;
+
+   private final AtomicInteger posCount = new AtomicInteger(0);
+
+   private final AtomicInteger liveBytes = new AtomicInteger(0);
+
+   private boolean canReclaim;
+
+   private final AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
+
+   private final int version;
+
+   private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
+
+   public JournalFileImpl(final SequentialFile file, final long fileID, final int version)
+   {
+      this.file = file;
+
+      this.fileID = fileID;
+
+      this.version = version;
+
+      recordID = (int)(fileID & Integer.MAX_VALUE);
+   }
+
+   public int getPosCount()
+   {
+      return posCount.intValue();
+   }
+
+   @Override
+   public boolean isCanReclaim()
+   {
+      return canReclaim;
+   }
+
+   @Override
+   public void setCanReclaim(final boolean canReclaim)
+   {
+      this.canReclaim = canReclaim;
+   }
+
+   public void incNegCount(final JournalFile file)
+   {
+      if (file != this)
+      {
+         totalNegativeToOthers.incrementAndGet();
+      }
+      getOrCreateNegCount(file).incrementAndGet();
+   }
+
+   public int getNegCount(final JournalFile file)
+   {
+      AtomicInteger count = negCounts.get(file);
+
+      if (count == null)
+      {
+         return 0;
+      }
+      else
+      {
+         return count.intValue();
+      }
+   }
+
+   public int getJournalVersion()
+   {
+      return version;
+   }
+
+   public void incPosCount()
+   {
+      posCount.incrementAndGet();
+   }
+
+   public void decPosCount()
+   {
+      posCount.decrementAndGet();
+   }
+
+   public long getOffset()
+   {
+      return offset;
+   }
+
+   public long getFileID()
+   {
+      return fileID;
+   }
+
+   public int getRecordID()
+   {
+      return recordID;
+   }
+
+   public void setOffset(final long offset)
+   {
+      this.offset = offset;
+   }
+
+   public SequentialFile getFile()
+   {
+      return file;
+   }
+
+   @Override
+   public String toString()
+   {
+      try
+      {
+         return "JournalFileImpl: (" + file.getFileName() + " id = " + fileID + ", recordID = " + recordID + ")";
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         return "Error:" + e.toString();
+      }
+   }
+
+   /** Receive debug information about the journal */
+   public String debug()
+   {
+      StringBuilder builder = new StringBuilder();
+
+      for (Entry<JournalFile, AtomicInteger> entry : negCounts.entrySet())
+      {
+         builder.append(" file = " + entry.getKey() + " negcount value = " + entry.getValue() + "\n");
+      }
+
+      return builder.toString();
+   }
+
+   private synchronized AtomicInteger getOrCreateNegCount(final JournalFile file)
+   {
+      AtomicInteger count = negCounts.get(file);
+
+      if (count == null)
+      {
+         count = new AtomicInteger();
+         negCounts.put(file, count);
+      }
+
+      return count;
+   }
+
+   @Override
+   public void addSize(final int bytes)
+   {
+      liveBytes.addAndGet(bytes);
+   }
+
+   @Override
+   public void decSize(final int bytes)
+   {
+      liveBytes.addAndGet(-bytes);
+   }
+
+   @Override
+   public int getLiveSize()
+   {
+      return liveBytes.get();
+   }
+
+   public int getTotalNegativeToOthers()
+   {
+      return totalNegativeToOthers.get();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFilesRepository.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFilesRepository.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFilesRepository.java
new file mode 100644
index 0000000..56ab68f
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalFilesRepository.java
@@ -0,0 +1,767 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq6.core.journal.SequentialFile;
+import org.apache.activemq6.core.journal.SequentialFileFactory;
+import org.apache.activemq6.journal.HornetQJournalLogger;
+
+/**
+ * This is a helper class for the Journal, which will control access to dataFiles, openedFiles and freeFiles
+ * Guaranteeing that they will be delivered in order to the Journal
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public class JournalFilesRepository
+{
+   private static final boolean trace = HornetQJournalLogger.LOGGER.isTraceEnabled();
+
+   /**
+    * Used to debug the consistency of the journal ordering.
+    * <p/>
+    * This is meant to be false as these extra checks would cause performance issues
+    */
+   private static final boolean CHECK_CONSISTENCE = false;
+
+   // This method exists just to make debug easier.
+   // I could replace log.trace by log.info temporarily while I was debugging
+   // Journal
+   private static void trace(final String message)
+   {
+      HornetQJournalLogger.LOGGER.trace(message);
+   }
+
+   private final SequentialFileFactory fileFactory;
+
+   private final JournalImpl journal;
+
+   private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
+
+   private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
+
+   private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
+
+   private final AtomicLong nextFileID = new AtomicLong(0);
+
+   private final int maxAIO;
+
+   private final int minFiles;
+
+   private final int fileSize;
+
+   private final String filePrefix;
+
+   private final String fileExtension;
+
+   private final int userVersion;
+
+   private final AtomicInteger freeFilesCount = new AtomicInteger(0);
+
+   private Executor openFilesExecutor;
+
+   private final Runnable pushOpenRunnable = new Runnable()
+   {
+      public void run()
+      {
+         try
+         {
+            pushOpenedFile();
+         }
+         catch (Exception e)
+         {
+            HornetQJournalLogger.LOGGER.errorPushingFile(e);
+         }
+      }
+   };
+
+   public JournalFilesRepository(final SequentialFileFactory fileFactory,
+                                 final JournalImpl journal,
+                                 final String filePrefix,
+                                 final String fileExtension,
+                                 final int userVersion,
+                                 final int maxAIO,
+                                 final int fileSize,
+                                 final int minFiles)
+   {
+      if (filePrefix == null)
+      {
+         throw new IllegalArgumentException("filePrefix cannot be null");
+      }
+      if (fileExtension == null)
+      {
+         throw new IllegalArgumentException("fileExtension cannot be null");
+      }
+      if (maxAIO <= 0)
+      {
+         throw new IllegalArgumentException("maxAIO must be a positive number");
+      }
+      this.fileFactory = fileFactory;
+      this.maxAIO = maxAIO;
+      this.filePrefix = filePrefix;
+      this.fileExtension = fileExtension;
+      this.minFiles = minFiles;
+      this.fileSize = fileSize;
+      this.userVersion = userVersion;
+      this.journal = journal;
+   }
+
+   // Public --------------------------------------------------------
+
+   public void setExecutor(final Executor fileExecutor)
+   {
+      this.openFilesExecutor = fileExecutor;
+   }
+
+   public void clear() throws Exception
+   {
+      dataFiles.clear();
+
+      freeFiles.clear();
+
+      freeFilesCount.set(0);
+
+      for (JournalFile file : openedFiles)
+      {
+         try
+         {
+            file.getFile().close();
+         }
+         catch (Exception e)
+         {
+            HornetQJournalLogger.LOGGER.errorClosingFile(e);
+         }
+      }
+      openedFiles.clear();
+   }
+
+   public int getMaxAIO()
+   {
+      return maxAIO;
+   }
+
+   public String getFileExtension()
+   {
+      return fileExtension;
+   }
+
+   public String getFilePrefix()
+   {
+      return filePrefix;
+   }
+
+   public void calculateNextfileID(final List<JournalFile> files)
+   {
+
+      for (JournalFile file : files)
+      {
+         final long fileIdFromFile = file.getFileID();
+         final long fileIdFromName = getFileNameID(file.getFile().getFileName());
+
+         // The compactor could create a fileName but use a previously assigned ID.
+         // Because of that we need to take both parts into account
+         setNextFileID(Math.max(fileIdFromName, fileIdFromFile));
+      }
+   }
+
+   /**
+    * Set the {link #nextFileID} value to {@code targetUpdate} if the current value is less than
+    * {@code targetUpdate}.
+    * <p/>
+    * Notice that {@code nextFileID} is incremented before being used, see
+    * {@link JournalFilesRepository#generateFileID()}.
+    *
+    * @param targetUpdate
+    */
+   public void setNextFileID(final long targetUpdate)
+   {
+      while (true)
+      {
+         final long current = nextFileID.get();
+         if (current >= targetUpdate)
+            return;
+
+         if (nextFileID.compareAndSet(current, targetUpdate))
+            return;
+      }
+   }
+
+   public void ensureMinFiles() throws Exception
+   {
+      int filesToCreate = minFiles - (dataFiles.size() + freeFilesCount.get());
+
+      if (filesToCreate > 0)
+      {
+         for (int i = 0; i < filesToCreate; i++)
+         {
+            // Keeping all files opened can be very costly (mainly on AIO)
+            freeFiles.add(createFile(false, false, true, false, -1));
+            freeFilesCount.getAndIncrement();
+         }
+      }
+
+   }
+
+   public void openFile(final JournalFile file, final boolean multiAIO) throws Exception
+   {
+      if (multiAIO)
+      {
+         file.getFile().open();
+      }
+      else
+      {
+         file.getFile().open(1, false);
+      }
+
+      file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
+   }
+
+   // Data File Operations ==========================================
+
+   public JournalFile[] getDataFilesArray()
+   {
+      return dataFiles.toArray(new JournalFile[dataFiles.size()]);
+   }
+
+   public JournalFile pollLastDataFile()
+   {
+      return dataFiles.pollLast();
+   }
+
+   public void removeDataFile(final JournalFile file)
+   {
+      if (!dataFiles.remove(file))
+      {
+         HornetQJournalLogger.LOGGER.couldNotRemoveFile(file);
+      }
+   }
+
+   public int getDataFilesCount()
+   {
+      return dataFiles.size();
+   }
+
+   public Collection<JournalFile> getDataFiles()
+   {
+      return dataFiles;
+   }
+
+   public void clearDataFiles()
+   {
+      dataFiles.clear();
+   }
+
+   public void addDataFileOnTop(final JournalFile file)
+   {
+      dataFiles.addFirst(file);
+
+      if (CHECK_CONSISTENCE)
+      {
+         checkDataFiles();
+      }
+   }
+
+   public String debugFiles()
+   {
+      StringBuilder buffer = new StringBuilder();
+
+      buffer.append("**********\nCurrent File = " + journal.getCurrentFile() + "\n");
+      buffer.append("**********\nDataFiles:\n");
+      for (JournalFile file : dataFiles)
+      {
+         buffer.append(file.toString() + "\n");
+      }
+      buffer.append("*********\nFreeFiles:\n");
+      for (JournalFile file : freeFiles)
+      {
+         buffer.append(file.toString() + "\n");
+      }
+      return buffer.toString();
+   }
+
+   public synchronized void checkDataFiles()
+   {
+      long seq = -1;
+      for (JournalFile file : dataFiles)
+      {
+         if (file.getFileID() <= seq)
+         {
+            HornetQJournalLogger.LOGGER.checkFiles();
+            HornetQJournalLogger.LOGGER.info(debugFiles());
+            HornetQJournalLogger.LOGGER.seqOutOfOrder();
+            System.exit(-1);
+         }
+
+         if (journal.getCurrentFile() != null && journal.getCurrentFile().getFileID() <= file.getFileID())
+         {
+            HornetQJournalLogger.LOGGER.checkFiles();
+            HornetQJournalLogger.LOGGER.info(debugFiles());
+            HornetQJournalLogger.LOGGER.currentFile(file.getFileID(), journal.getCurrentFile().getFileID(),
+                                                    file.getFileID(), (journal.getCurrentFile() == file));
+
+            // throw new RuntimeException ("Check failure!");
+         }
+
+         if (journal.getCurrentFile() == file)
+         {
+            throw new RuntimeException("Check failure! Current file listed as data file!");
+         }
+
+         seq = file.getFileID();
+      }
+
+      long lastFreeId = -1;
+      for (JournalFile file : freeFiles)
+      {
+         if (file.getFileID() <= lastFreeId)
+         {
+            HornetQJournalLogger.LOGGER.checkFiles();
+            HornetQJournalLogger.LOGGER.info(debugFiles());
+            HornetQJournalLogger.LOGGER.fileIdOutOfOrder();
+
+            throw new RuntimeException("Check failure!");
+         }
+
+         lastFreeId = file.getFileID();
+
+         if (file.getFileID() < seq)
+         {
+            HornetQJournalLogger.LOGGER.checkFiles();
+            HornetQJournalLogger.LOGGER.info(debugFiles());
+            HornetQJournalLogger.LOGGER.fileTooSmall();
+
+            // throw new RuntimeException ("Check failure!");
+         }
+      }
+   }
+
+   public void addDataFileOnBottom(final JournalFile file)
+   {
+      dataFiles.add(file);
+
+      if (CHECK_CONSISTENCE)
+      {
+         checkDataFiles();
+      }
+   }
+
+   // Free File Operations ==========================================
+
+   public int getFreeFilesCount()
+   {
+      return freeFilesCount.get();
+   }
+
+   /**
+    * @param file
+    * @throws Exception
+    */
+   public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
+   {
+      addFreeFile(file, renameTmp, true);
+   }
+
+   /**
+    * @param file
+    * @param renameTmp   - should rename the file as it's being added to free files
+    * @param checkDelete - should delete the file if max condition has been met
+    * @throws Exception
+    */
+   public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp, final boolean checkDelete) throws Exception
+   {
+      long calculatedSize = 0;
+      try
+      {
+         calculatedSize = file.getFile().size();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         System.out.println("Can't get file size on " + file);
+         System.exit(-1);
+      }
+      if (calculatedSize != fileSize)
+      {
+         HornetQJournalLogger.LOGGER.deletingFile(file);
+         file.getFile().delete();
+      }
+      else
+         if (!checkDelete || (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size() < minFiles))
+         {
+            // Re-initialise it
+
+            if (JournalFilesRepository.trace)
+            {
+               JournalFilesRepository.trace("Adding free file " + file);
+            }
+
+            JournalFile jf = reinitializeFile(file);
+
+            if (renameTmp)
+            {
+               jf.getFile().renameTo(JournalImpl.renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
+            }
+
+            freeFiles.add(jf);
+            freeFilesCount.getAndIncrement();
+         }
+         else
+         {
+            if (trace)
+            {
+               HornetQJournalLogger.LOGGER.trace("DataFiles.size() = " + dataFiles.size());
+               HornetQJournalLogger.LOGGER.trace("openedFiles.size() = " + openedFiles.size());
+               HornetQJournalLogger.LOGGER.trace("minfiles = " + minFiles);
+               HornetQJournalLogger.LOGGER.trace("Free Files = " + freeFilesCount.get());
+               HornetQJournalLogger.LOGGER.trace("File " + file +
+                                                    " being deleted as freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() (" +
+                                                    (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size()) +
+                                                    ") < minFiles (" + minFiles + ")");
+            }
+            file.getFile().delete();
+         }
+
+      if (CHECK_CONSISTENCE)
+      {
+         checkDataFiles();
+      }
+   }
+
+   public Collection<JournalFile> getFreeFiles()
+   {
+      return freeFiles;
+   }
+
+   public JournalFile getFreeFile()
+   {
+      JournalFile file = freeFiles.remove();
+      freeFilesCount.getAndDecrement();
+      return file;
+   }
+
+   // Opened files operations =======================================
+
+   public int getOpenedFilesCount()
+   {
+      return openedFiles.size();
+   }
+
+   /**
+    * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
+    * <p>In case there are no cached opened files, this method will block until the file was opened,
+    * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p>
+    */
+   public JournalFile openFile() throws InterruptedException
+   {
+      if (JournalFilesRepository.trace)
+      {
+         JournalFilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
+      }
+
+      if (openFilesExecutor == null)
+      {
+         pushOpenRunnable.run();
+      }
+      else
+      {
+         openFilesExecutor.execute(pushOpenRunnable);
+      }
+
+      JournalFile nextFile = null;
+
+      while (nextFile == null)
+      {
+         nextFile = openedFiles.poll(5, TimeUnit.SECONDS);
+         if (nextFile == null)
+         {
+            HornetQJournalLogger.LOGGER.errorOpeningFile(new Exception("trace"));
+         }
+      }
+
+      if (JournalFilesRepository.trace)
+      {
+         JournalFilesRepository.trace("Returning file " + nextFile);
+      }
+
+      return nextFile;
+   }
+
+   /**
+    * Open a file and place it into the openedFiles queue
+    */
+   public void pushOpenedFile() throws Exception
+   {
+      JournalFile nextOpenedFile = takeFile(true, true, true, false);
+
+      if (JournalFilesRepository.trace)
+      {
+         JournalFilesRepository.trace("pushing openFile " + nextOpenedFile);
+      }
+
+      if (!openedFiles.offer(nextOpenedFile))
+      {
+         HornetQJournalLogger.LOGGER.failedToAddFile(nextOpenedFile);
+      }
+   }
+
+   public void closeFile(final JournalFile file) throws Exception
+   {
+      fileFactory.deactivateBuffer();
+      file.getFile().close();
+      dataFiles.add(file);
+   }
+
+   /**
+    * This will get a File from freeFile without initializing it
+    *
+    * @return uninitialized JournalFile
+    * @throws Exception
+    * @see {@link JournalImpl#initFileHeader(SequentialFileFactory, SequentialFile, int, long)}
+    */
+   public JournalFile takeFile(final boolean keepOpened,
+                               final boolean multiAIO,
+                               final boolean initFile,
+                               final boolean tmpCompactExtension) throws Exception
+   {
+      JournalFile nextFile = null;
+
+      nextFile = freeFiles.poll();
+
+      if (nextFile != null)
+      {
+         freeFilesCount.getAndDecrement();
+      }
+
+      if (nextFile == null)
+      {
+         nextFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension, -1);
+      }
+      else
+      {
+         if (tmpCompactExtension)
+         {
+            SequentialFile sequentialFile = nextFile.getFile();
+            sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+         }
+
+         if (keepOpened)
+         {
+            openFile(nextFile, multiAIO);
+         }
+      }
+      return nextFile;
+   }
+
+   /**
+    * Creates files for journal synchronization of a replicated backup.
+    * <p/>
+    * In order to simplify synchronization, the file IDs in the backup match those in the live
+    * server.
+    *
+    * @param fileID the fileID to use when creating the file.
+    */
+   public JournalFile createRemoteBackupSyncFile(long fileID) throws Exception
+   {
+      return createFile(false, false, true, false, fileID);
+   }
+
+   /**
+    * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
+    *
+    * @param keepOpened
+    * @return an initialized journal file
+    * @throws Exception
+    */
+   private JournalFile createFile(final boolean keepOpened,
+                                  final boolean multiAIO,
+                                  final boolean init,
+                                  final boolean tmpCompact,
+                                  final long fileIdPreSet) throws Exception
+   {
+      if (System.getSecurityManager() == null)
+      {
+         return createFile0(keepOpened, multiAIO, init, tmpCompact, fileIdPreSet);
+      }
+      else
+      {
+         try
+         {
+            return AccessController.doPrivileged(new PrivilegedExceptionAction<JournalFile>()
+            {
+               @Override
+               public JournalFile run() throws Exception
+               {
+                  return createFile0(keepOpened, multiAIO, init, tmpCompact, fileIdPreSet);
+               }
+            });
+         }
+         catch (PrivilegedActionException e)
+         {
+            throw unwrapException(e);
+         }
+      }
+   }
+
+   private RuntimeException unwrapException(PrivilegedActionException e) throws Exception
+   {
+      Throwable c = e.getCause();
+      if (c instanceof RuntimeException)
+      {
+         throw (RuntimeException) c;
+      }
+      else if (c instanceof Error)
+      {
+         throw (Error) c;
+      }
+      else
+      {
+         throw new RuntimeException(c);
+      }
+   }
+
+   private JournalFile createFile0(final boolean keepOpened,
+                                   final boolean multiAIO,
+                                   final boolean init,
+                                   final boolean tmpCompact,
+                                   final long fileIdPreSet) throws Exception
+   {
+      long fileID = fileIdPreSet != -1 ? fileIdPreSet : generateFileID();
+
+      final String fileName = createFileName(tmpCompact, fileID);
+
+      if (JournalFilesRepository.trace)
+      {
+         JournalFilesRepository.trace("Creating file " + fileName);
+      }
+
+      String tmpFileName = fileName + ".tmp";
+
+      SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
+
+      sequentialFile.open(1, false);
+
+      if (init)
+      {
+         sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
+
+         JournalImpl.initFileHeader(fileFactory, sequentialFile, userVersion, fileID);
+      }
+
+      long position = sequentialFile.position();
+
+      sequentialFile.close();
+
+      if (JournalFilesRepository.trace)
+      {
+         JournalFilesRepository.trace("Renaming file " + tmpFileName + " as " + fileName);
+      }
+
+      sequentialFile.renameTo(fileName);
+
+      if (keepOpened)
+      {
+         if (multiAIO)
+         {
+            sequentialFile.open();
+         }
+         else
+         {
+            sequentialFile.open(1, false);
+         }
+         sequentialFile.position(position);
+      }
+
+      return new JournalFileImpl(sequentialFile, fileID, JournalImpl.FORMAT_VERSION);
+   }
+
+   /**
+    * @param tmpCompact
+    * @param fileID
+    * @return
+    */
+   private String createFileName(final boolean tmpCompact, final long fileID)
+   {
+      String fileName;
+      if (tmpCompact)
+      {
+         fileName = filePrefix + "-" + fileID + "." + fileExtension + ".cmp";
+      }
+      else
+      {
+         fileName = filePrefix + "-" + fileID + "." + fileExtension;
+      }
+      return fileName;
+   }
+
+   private long generateFileID()
+   {
+      return nextFileID.incrementAndGet();
+   }
+
+   /**
+    * Get the ID part of the name
+    */
+   private long getFileNameID(final String fileName)
+   {
+      try
+      {
+         return Long.parseLong(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
+      }
+      catch (Throwable e)
+      {
+         HornetQJournalLogger.LOGGER.errorRetrievingID(e, fileName);
+         return 0;
+      }
+   }
+
+   // Discard the old JournalFile and set it with a new ID
+   private JournalFile reinitializeFile(final JournalFile file) throws Exception
+   {
+      long newFileID = generateFileID();
+
+      SequentialFile sf = file.getFile();
+
+      sf.open(1, false);
+
+      int position = JournalImpl.initFileHeader(fileFactory, sf, userVersion, newFileID);
+
+      JournalFile jf = new JournalFileImpl(sf, newFileID, JournalImpl.FORMAT_VERSION);
+
+      sf.position(position);
+
+      sf.close();
+
+      return jf;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "JournalFilesRepository(dataFiles=" + dataFiles + ", freeFiles=" + freeFiles + ", openedFiles=" +
+         openedFiles + ")";
+   }
+}


Mime
View raw message