activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [09/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:41:39 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractJournalUpdateTask.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractJournalUpdateTask.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractJournalUpdateTask.java
new file mode 100644
index 0000000..54ab464
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractJournalUpdateTask.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQBuffers;
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.core.journal.SequentialFile;
+import org.apache.activemq6.core.journal.SequentialFileFactory;
+import org.apache.activemq6.core.journal.impl.dataformat.ByteArrayEncoding;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalAddRecord;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalInternalRecord;
+import org.apache.activemq6.utils.ConcurrentHashSet;
+
+/**
+ *
+ * Super class for Journal maintenances such as clean up and Compactor
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   protected static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
+
+   protected final JournalImpl journal;
+
+   protected final SequentialFileFactory fileFactory;
+
+   protected JournalFile currentFile;
+
+   protected SequentialFile sequentialFile;
+
+   protected final JournalFilesRepository filesRepository;
+
+   protected long nextOrderingID;
+
+   private HornetQBuffer writingChannel;
+
+   private final Set<Long> recordsSnapshot = new ConcurrentHashSet<Long>();
+
+   protected final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
+                                       final JournalImpl journal,
+                                       final JournalFilesRepository filesRepository,
+                                       final Set<Long> recordsSnapshot,
+                                       final long nextOrderingID)
+   {
+      super();
+      this.journal = journal;
+      this.filesRepository = filesRepository;
+      this.fileFactory = fileFactory;
+      this.nextOrderingID = nextOrderingID;
+      this.recordsSnapshot.addAll(recordsSnapshot);
+   }
+
+   // Public --------------------------------------------------------
+
+   public static SequentialFile writeControlFile(final SequentialFileFactory fileFactory,
+                                                 final List<JournalFile> files,
+                                                 final List<JournalFile> newFiles,
+                                                 final List<Pair<String, String>> renames) throws Exception
+   {
+
+      SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL, 1);
+
+      try
+      {
+         controlFile.open(1, false);
+
+         JournalImpl.initFileHeader(fileFactory, controlFile, 0, 0);
+
+         HornetQBuffer filesToRename = HornetQBuffers.dynamicBuffer(1);
+
+         // DataFiles first
+
+         if (files == null)
+         {
+            filesToRename.writeInt(0);
+         }
+         else
+         {
+            filesToRename.writeInt(files.size());
+
+            for (JournalFile file : files)
+            {
+               filesToRename.writeUTF(file.getFile().getFileName());
+            }
+         }
+
+         // New Files second
+
+         if (newFiles == null)
+         {
+            filesToRename.writeInt(0);
+         }
+         else
+         {
+            filesToRename.writeInt(newFiles.size());
+
+            for (JournalFile file : newFiles)
+            {
+               filesToRename.writeUTF(file.getFile().getFileName());
+            }
+         }
+
+         // Renames from clean up third
+         if (renames == null)
+         {
+            filesToRename.writeInt(0);
+         }
+         else
+         {
+            filesToRename.writeInt(renames.size());
+            for (Pair<String, String> rename : renames)
+            {
+               filesToRename.writeUTF(rename.getA());
+               filesToRename.writeUTF(rename.getB());
+            }
+         }
+
+         JournalInternalRecord controlRecord = new JournalAddRecord(true,
+                                                                    1,
+                                                                    (byte)0,
+                                                                    new ByteArrayEncoding(filesToRename.toByteBuffer()
+                                                                                                       .array()));
+
+         HornetQBuffer renameBuffer = HornetQBuffers.dynamicBuffer(filesToRename.writerIndex());
+
+         controlRecord.setFileID(0);
+
+         controlRecord.encode(renameBuffer);
+
+         ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
+
+         writeBuffer.put(renameBuffer.toByteBuffer().array(), 0, renameBuffer.writerIndex());
+
+         writeBuffer.rewind();
+
+         controlFile.writeDirect(writeBuffer, true);
+
+         return controlFile;
+      }
+      finally
+      {
+         controlFile.close();
+      }
+   }
+
+   /** Write pending output into file */
+   public void flush() throws Exception
+   {
+      if (writingChannel != null)
+      {
+         sequentialFile.position(0);
+
+         // To Fix the size of the file
+         writingChannel.writerIndex(writingChannel.capacity());
+
+         sequentialFile.writeInternal(writingChannel.toByteBuffer());
+         sequentialFile.close();
+         newDataFiles.add(currentFile);
+      }
+
+      writingChannel = null;
+   }
+
+   public boolean lookupRecord(final long id)
+   {
+      return recordsSnapshot.contains(id);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+   /**
+    * @throws Exception
+    */
+
+   protected void openFile() throws Exception
+   {
+      flush();
+
+      ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
+
+      writingChannel = HornetQBuffers.wrappedBuffer(bufferWrite);
+
+      currentFile = filesRepository.takeFile(false, false, false, true);
+
+      sequentialFile = currentFile.getFile();
+
+      sequentialFile.open(1, false);
+
+      currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION);
+
+      JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
+   }
+
+   protected void addToRecordsSnaptshot(final long id)
+   {
+      recordsSnapshot.add(id);
+   }
+
+   /**
+    * @return the writingChannel
+    */
+   protected HornetQBuffer getWritingChannel()
+   {
+      return writingChannel;
+   }
+
+   protected void writeEncoder(final JournalInternalRecord record) throws Exception
+   {
+      record.setFileID(currentFile.getRecordID());
+      record.encode(getWritingChannel());
+   }
+
+   protected void writeEncoder(final JournalInternalRecord record, final int txcounter) throws Exception
+   {
+      record.setNumberOfRecords(txcounter);
+      writeEncoder(record);
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFile.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFile.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFile.java
new file mode 100644
index 0000000..1f4362a
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFile.java
@@ -0,0 +1,408 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQBuffers;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQIOErrorException;
+import org.apache.activemq6.core.journal.EncodingSupport;
+import org.apache.activemq6.core.journal.IOAsyncTask;
+import org.apache.activemq6.core.journal.SequentialFile;
+import org.apache.activemq6.core.journal.SequentialFileFactory;
+import org.apache.activemq6.journal.HornetQJournalBundle;
+import org.apache.activemq6.journal.HornetQJournalLogger;
+
+/**
+ * A AbstractSequentialFile
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public abstract class AbstractSequentialFile implements SequentialFile
+{
+
+   private File file;
+
+   private final String directory;
+
+   protected final SequentialFileFactory factory;
+
+   protected long fileSize = 0;
+
+   protected final AtomicLong position = new AtomicLong(0);
+
+   protected TimedBuffer timedBuffer;
+
+   /**
+    * Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class.
+    * This is the class returned to the factory when the file is being activated.
+    */
+   protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
+
+   /**
+    * Used for asynchronous writes
+    */
+   protected final Executor writerExecutor;
+
+   /**
+    * @param file
+    * @param directory
+    */
+   public AbstractSequentialFile(final String directory,
+                                 final File file,
+                                 final SequentialFileFactory factory,
+                                 final Executor writerExecutor)
+   {
+      super();
+      this.file = file;
+      this.directory = directory;
+      this.factory = factory;
+      this.writerExecutor = writerExecutor;
+   }
+
+   // Public --------------------------------------------------------
+
+   public final boolean exists()
+   {
+      return file.exists();
+   }
+
+   public final String getFileName()
+   {
+      return file.getName();
+   }
+
+   public final void delete() throws IOException, InterruptedException, HornetQException
+   {
+      if (isOpen())
+      {
+         close();
+      }
+
+      if (file.exists() && !file.delete())
+      {
+         HornetQJournalLogger.LOGGER.errorDeletingFile(this);
+      }
+   }
+
+   public void copyTo(SequentialFile newFileName) throws Exception
+   {
+      try
+      {
+         HornetQJournalLogger.LOGGER.debug("Copying " + this + " as " + newFileName);
+         if (!newFileName.isOpen())
+         {
+            newFileName.open();
+         }
+
+         if (!isOpen())
+         {
+            this.open();
+         }
+
+
+         ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
+
+         for (;;)
+         {
+            buffer.rewind();
+            int size = this.read(buffer);
+            newFileName.writeDirect(buffer, false);
+            if (size < 10 * 1024)
+            {
+               break;
+            }
+         }
+         newFileName.close();
+         this.close();
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         throw e;
+      }
+   }
+
+   /**
+    * @throws IOException only declare exception due to signature. Sub-class needs it.
+    */
+   @Override
+   public void position(final long pos) throws IOException
+   {
+      position.set(pos);
+   }
+
+   public long position()
+   {
+      return position.get();
+   }
+
+   public final void renameTo(final String newFileName) throws IOException, InterruptedException,
+      HornetQException
+   {
+      try
+      {
+         close();
+      }
+      catch (IOException e)
+      {
+         factory.onIOError(new HornetQIOErrorException(e.getMessage(), e), e.getMessage(), this);
+         throw e;
+      }
+
+      File newFile = new File(directory + "/" + newFileName);
+
+      if (!file.equals(newFile))
+      {
+         if (!file.renameTo(newFile))
+         {
+            throw HornetQJournalBundle.BUNDLE.ioRenameFileError(file.getName(), newFileName);
+         }
+         file = newFile;
+      }
+   }
+
+   /**
+    * @throws IOException      we declare throwing IOException because sub-classes need to do it
+    * @throws HornetQException
+    */
+   public synchronized void close() throws IOException, InterruptedException, HornetQException
+   {
+      final CountDownLatch donelatch = new CountDownLatch(1);
+
+      if (writerExecutor != null)
+      {
+         writerExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               donelatch.countDown();
+            }
+         });
+
+         while (!donelatch.await(60, TimeUnit.SECONDS))
+         {
+            HornetQJournalLogger.LOGGER.couldNotCompleteTask(new Exception("trace"), file.getName());
+         }
+      }
+   }
+
+   public final boolean fits(final int size)
+   {
+      if (timedBuffer == null)
+      {
+         return position.get() + size <= fileSize;
+      }
+      else
+      {
+         return timedBuffer.checkSize(size);
+      }
+   }
+
+   public void setTimedBuffer(final TimedBuffer buffer)
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.setObserver(null);
+      }
+
+      timedBuffer = buffer;
+
+      if (buffer != null)
+      {
+         buffer.setObserver(timedBufferObserver);
+      }
+
+   }
+
+   public void write(final HornetQBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException
+   {
+      if (timedBuffer != null)
+      {
+         bytes.setIndex(0, bytes.capacity());
+         timedBuffer.addBytes(bytes, sync, callback);
+      }
+      else
+      {
+         ByteBuffer buffer = factory.newBuffer(bytes.capacity());
+         buffer.put(bytes.toByteBuffer().array());
+         buffer.rewind();
+         writeDirect(buffer, sync, callback);
+      }
+   }
+
+   public void write(final HornetQBuffer bytes, final boolean sync) throws IOException, InterruptedException,
+      HornetQException
+   {
+      if (sync)
+      {
+         SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+         write(bytes, true, completion);
+
+         completion.waitCompletion();
+      }
+      else
+      {
+         write(bytes, false, DummyCallback.getInstance());
+      }
+   }
+
+   public void write(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.addBytes(bytes, sync, callback);
+      }
+      else
+      {
+         ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize());
+
+         // If not using the TimedBuffer, a final copy is necessary
+         // Because AIO will need a specific Buffer
+         // And NIO will also need a whole buffer to perform the write
+
+         HornetQBuffer outBuffer = HornetQBuffers.wrappedBuffer(buffer);
+         bytes.encode(outBuffer);
+         buffer.rewind();
+         writeDirect(buffer, sync, callback);
+      }
+   }
+
+   public void write(final EncodingSupport bytes, final boolean sync) throws InterruptedException, HornetQException
+   {
+      if (sync)
+      {
+         SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+         write(bytes, true, completion);
+
+         completion.waitCompletion();
+      }
+      else
+      {
+         write(bytes, false, DummyCallback.getInstance());
+      }
+   }
+
+   protected File getFile()
+   {
+      return file;
+   }
+
+   private static final class DelegateCallback implements IOAsyncTask
+   {
+      final List<IOAsyncTask> delegates;
+
+      private DelegateCallback(final List<IOAsyncTask> delegates)
+      {
+         this.delegates = delegates;
+      }
+
+      public void done()
+      {
+         for (IOAsyncTask callback : delegates)
+         {
+            try
+            {
+               callback.done();
+            }
+            catch (Throwable e)
+            {
+               HornetQJournalLogger.LOGGER.errorCompletingCallback(e);
+            }
+         }
+      }
+
+      public void onError(final int errorCode, final String errorMessage)
+      {
+         for (IOAsyncTask callback : delegates)
+         {
+            try
+            {
+               callback.onError(errorCode, errorMessage);
+            }
+            catch (Throwable e)
+            {
+               HornetQJournalLogger.LOGGER.errorCallingErrorCallback(e);
+            }
+         }
+      }
+   }
+
+   protected ByteBuffer newBuffer(int size, int limit)
+   {
+      size = factory.calculateBlockSize(size);
+      limit = factory.calculateBlockSize(limit);
+
+      ByteBuffer buffer = factory.newBuffer(size);
+      buffer.limit(limit);
+      return buffer;
+   }
+
+   protected class LocalBufferObserver implements TimedBufferObserver
+   {
+      public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOAsyncTask> callbacks)
+      {
+         buffer.flip();
+
+         if (buffer.limit() == 0)
+         {
+            factory.releaseBuffer(buffer);
+         }
+         else
+         {
+            writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
+         }
+      }
+
+      public ByteBuffer newBuffer(final int size, final int limit)
+      {
+         return AbstractSequentialFile.this.newBuffer(size, limit);
+      }
+
+      public int getRemainingBytes()
+      {
+         if (fileSize - position.get() > Integer.MAX_VALUE)
+         {
+            return Integer.MAX_VALUE;
+         }
+         else
+         {
+            return (int)(fileSize - position.get());
+         }
+      }
+
+      @Override
+      public String toString()
+      {
+         return "TimedBufferObserver on file (" + getFile().getName() + ")";
+      }
+
+   }
+
+   @Override
+   public File getJavaFile()
+   {
+      return getFile().getAbsoluteFile();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFileFactory.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFileFactory.java
new file mode 100644
index 0000000..4a4c8f2
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/AbstractSequentialFileFactory.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq6.api.core.HornetQInterruptedException;
+import org.apache.activemq6.core.journal.IOCriticalErrorListener;
+import org.apache.activemq6.core.journal.SequentialFile;
+import org.apache.activemq6.core.journal.SequentialFileFactory;
+import org.apache.activemq6.journal.HornetQJournalLogger;
+import org.apache.activemq6.utils.HornetQThreadFactory;
+
+/**
+ *
+ * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ */
+abstract class AbstractSequentialFileFactory implements SequentialFileFactory
+{
+
+   // Timeout used to wait executors to shutdown
+   protected static final int EXECUTOR_TIMEOUT = 60;
+
+   protected final String journalDir;
+
+   protected final TimedBuffer timedBuffer;
+
+   protected final int bufferSize;
+
+   protected final long bufferTimeout;
+
+   private final IOCriticalErrorListener critialErrorListener;
+
+   /**
+    * Asynchronous writes need to be done at another executor.
+    * This needs to be done at NIO, or else we would have the callers thread blocking for the return.
+    * At AIO this is necessary as context switches on writes would fire flushes at the kernel.
+    *  */
+   protected ExecutorService writeExecutor;
+
+   AbstractSequentialFileFactory(final String journalDir,
+                                        final boolean buffered,
+                                        final int bufferSize,
+                                        final int bufferTimeout,
+                                        final boolean logRates,
+                                        final IOCriticalErrorListener criticalErrorListener)
+   {
+      this.journalDir = journalDir;
+
+      if (buffered)
+      {
+         timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, logRates);
+      }
+      else
+      {
+         timedBuffer = null;
+      }
+      this.bufferSize = bufferSize;
+      this.bufferTimeout = bufferTimeout;
+      this.critialErrorListener = criticalErrorListener;
+   }
+
+   public void stop()
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.stop();
+      }
+
+      if (isSupportsCallbacks() && writeExecutor != null)
+      {
+         writeExecutor.shutdown();
+
+         try
+         {
+            if (!writeExecutor.awaitTermination(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+            {
+               HornetQJournalLogger.LOGGER.timeoutOnWriterShutdown(new Exception("trace"));
+            }
+         }
+         catch (InterruptedException e)
+         {
+            throw new HornetQInterruptedException(e);
+         }
+      }
+   }
+
+   public String getDirectory()
+   {
+      return journalDir;
+   }
+
+   public void start()
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.start();
+      }
+
+      if (isSupportsCallbacks())
+      {
+         writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this),
+                                                                                    true,
+                                                                                    AbstractSequentialFileFactory.getThisClassLoader()));
+      }
+
+   }
+
+   @Override
+   public void onIOError(Exception exception, String message, SequentialFile file)
+   {
+      if (critialErrorListener != null)
+      {
+         critialErrorListener.onIOException(exception, message, file);
+      }
+   }
+
+   @Override
+   public void activateBuffer(final SequentialFile file)
+   {
+      if (timedBuffer != null)
+      {
+         file.setTimedBuffer(timedBuffer);
+      }
+   }
+
+   public void flush()
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.flush();
+      }
+   }
+
+   public void deactivateBuffer()
+   {
+      if (timedBuffer != null)
+      {
+         // When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer
+         timedBuffer.flush();
+         timedBuffer.setObserver(null);
+      }
+   }
+
+   public void releaseBuffer(final ByteBuffer buffer)
+   {
+   }
+
+   /**
+    * Create the directory if it doesn't exist yet
+    */
+   public void createDirs() throws Exception
+   {
+      File file = new File(journalDir);
+      boolean ok = file.mkdirs();
+      if (!ok)
+      {
+         throw new IOException("Failed to create directory " + journalDir);
+      }
+   }
+
+   public List<String> listFiles(final String extension) throws Exception
+   {
+      File dir = new File(journalDir);
+
+      FilenameFilter fnf = new FilenameFilter()
+      {
+         public boolean accept(final File file, final String name)
+         {
+            return name.endsWith("." + extension);
+         }
+      };
+
+      String[] fileNames = dir.list(fnf);
+
+      if (fileNames == null)
+      {
+         throw new IOException("Failed to list: " + journalDir);
+      }
+
+      return Arrays.asList(fileNames);
+   }
+
+   private static ClassLoader getThisClassLoader()
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+      {
+         public ClassLoader run()
+         {
+            return AbstractSequentialFileFactory.class.getClassLoader();
+         }
+      });
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/CompactJournal.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/CompactJournal.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/CompactJournal.java
new file mode 100644
index 0000000..849219a
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/CompactJournal.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import org.apache.activemq6.core.journal.IOCriticalErrorListener;
+
+/**
+ * This is an undocumented class, that will open a journal and force compacting on it.
+ * <p>
+ * It may be used under special cases, but it shouldn't be needed under regular circumstances as the
+ * system should detect the need for compacting. The regular use is to configure min-compact
+ * parameters.
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public final class CompactJournal // NO_UCD
+{
+
+   public static void main(final String[] arg)
+   {
+      if (arg.length != 4)
+      {
+         System.err.println("Use: java -cp hornetq-core.jar org.apache.activemq6.core.journal.impl.CompactJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize>");
+         return;
+      }
+
+      try
+      {
+         CompactJournal.compactJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), null);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+
+   }
+
+   static void compactJournal(final String directory,
+                                     final String journalPrefix,
+                                     final String journalSuffix,
+                                     final int minFiles,
+                                     final int fileSize,
+                                     final IOCriticalErrorListener listener) throws Exception
+   {
+      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener);
+
+      JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+
+      journal.start();
+
+      journal.loadInternalOnly();
+
+      journal.compact();
+
+      journal.stop();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/DummyCallback.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/DummyCallback.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/DummyCallback.java
new file mode 100644
index 0000000..c2eaabed
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/DummyCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import org.apache.activemq6.journal.HornetQJournalLogger;
+
+/**
+ * A DummyCallback
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+class DummyCallback extends SyncIOCompletion
+{
+   private static final DummyCallback instance = new DummyCallback();
+
+   public static DummyCallback getInstance()
+   {
+      return DummyCallback.instance;
+   }
+
+   public void done()
+   {
+   }
+
+   public void onError(final int errorCode, final String errorMessage)
+   {
+      HornetQJournalLogger.LOGGER.errorWritingData(new Exception(errorMessage), errorMessage, errorCode);
+   }
+
+   @Override
+   public void waitCompletion() throws Exception
+   {
+   }
+
+   @Override
+   public void storeLineUp()
+   {
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ExportJournal.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ExportJournal.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ExportJournal.java
new file mode 100644
index 0000000..aaad661
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ExportJournal.java
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.apache.activemq6.core.journal.RecordInfo;
+import org.apache.activemq6.core.journal.SequentialFileFactory;
+import org.apache.activemq6.utils.Base64;
+
+/**
+ * Use this class to export the journal data. You can use it as a main class or through its native method {@link ExportJournal#exportJournal(String, String, String, int, int, String)}
+ *
+ * If you use the main method, use it as  <JournalDirectory> <JournalPrefix> <FileExtension> <MinFiles> <FileSize> <FileOutput>
+ *
+ * Example: java -cp hornetq-core.jar org.apache.activemq6.core.journal.impl.ExportJournal /journalDir hornetq-data hq 2 10485760 /tmp/export.dat
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ExportJournal
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public static void main(final String[] arg)
+   {
+      if (arg.length != 5)
+      {
+         System.err.println("Use: java -cp hornetq-core.jar org.apache.activemq6.core.journal.impl.ExportJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>");
+         return;
+      }
+
+      try
+      {
+         ExportJournal.exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+
+   }
+
+   public static void exportJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final String fileOutput) throws Exception
+   {
+
+      FileOutputStream fileOut = new FileOutputStream(new File(fileOutput));
+
+      BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
+
+      PrintStream out = new PrintStream(buffOut);
+
+      ExportJournal.exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
+
+      out.close();
+   }
+
+   public static void exportJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final PrintStream out) throws Exception
+   {
+      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null);
+
+      JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+
+      List<JournalFile> files = journal.orderFiles();
+
+      for (JournalFile file : files)
+      {
+         out.println("#File," + file);
+
+         ExportJournal.exportJournalFile(out, nio, file);
+      }
+   }
+
+   /**
+    * @param out
+    * @param fileFactory
+    * @param file
+    * @throws Exception
+    */
+   public static void exportJournalFile(final PrintStream out,
+                                        final SequentialFileFactory fileFactory,
+                                        final JournalFile file) throws Exception
+   {
+      JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+      {
+
+         public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+         {
+            out.println("operation@UpdateTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo));
+         }
+
+         public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
+         {
+            out.println("operation@Update," + ExportJournal.describeRecord(recordInfo));
+         }
+
+         public void onReadRollbackRecord(final long transactionID) throws Exception
+         {
+            out.println("operation@Rollback,txID@" + transactionID);
+         }
+
+         public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
+         {
+            out.println("operation@Prepare,txID@" + transactionID +
+                        ",numberOfRecords@" +
+                        numberOfRecords +
+                        ",extraData@" +
+                        ExportJournal.encode(extraData));
+         }
+
+         public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+         {
+            out.println("operation@DeleteRecordTX,txID@" + transactionID +
+                        "," +
+                        ExportJournal.describeRecord(recordInfo));
+         }
+
+         public void onReadDeleteRecord(final long recordID) throws Exception
+         {
+            out.println("operation@DeleteRecord,id@" + recordID);
+         }
+
+         public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
+         {
+            out.println("operation@Commit,txID@" + transactionID + ",numberOfRecords@" + numberOfRecords);
+         }
+
+         public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+         {
+            out.println("operation@AddRecordTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo));
+         }
+
+         public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
+         {
+            out.println("operation@AddRecord," + ExportJournal.describeRecord(recordInfo));
+         }
+
+         public void markAsDataFile(final JournalFile file)
+         {
+         }
+      });
+   }
+
+   private static String describeRecord(final RecordInfo recordInfo)
+   {
+      return "id@" + recordInfo.id +
+             ",userRecordType@" +
+             recordInfo.userRecordType +
+             ",length@" +
+             recordInfo.data.length +
+             ",isUpdate@" +
+             recordInfo.isUpdate +
+             ",compactCount@" +
+             recordInfo.compactCount +
+             ",data@" +
+             ExportJournal.encode(recordInfo.data);
+   }
+
+   private static String encode(final byte[] data)
+   {
+      return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/FileWrapperJournal.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/FileWrapperJournal.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/FileWrapperJournal.java
new file mode 100644
index 0000000..c7eb8e0
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/FileWrapperJournal.java
@@ -0,0 +1,337 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.HornetQUnsupportedPacketException;
+import org.apache.activemq6.core.journal.EncodingSupport;
+import org.apache.activemq6.core.journal.IOCompletion;
+import org.apache.activemq6.core.journal.Journal;
+import org.apache.activemq6.core.journal.JournalLoadInformation;
+import org.apache.activemq6.core.journal.LoaderCallback;
+import org.apache.activemq6.core.journal.PreparedTransactionInfo;
+import org.apache.activemq6.core.journal.RecordInfo;
+import org.apache.activemq6.core.journal.SequentialFileFactory;
+import org.apache.activemq6.core.journal.TransactionFailureCallback;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalAddRecord;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalAddRecordTX;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalCompleteRecordTX;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalCompleteRecordTX.TX_RECORD_TYPE;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalDeleteRecord;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalDeleteRecordTX;
+import org.apache.activemq6.core.journal.impl.dataformat.JournalInternalRecord;
+
+/**
+ * Journal used at a replicating backup server during the synchronization of data with the 'live'
+ * server. It just wraps a single {@link JournalFile}.
+ * <p/>
+ * Its main purpose is to store the data as a Journal would, but without verifying records.
+ */
+public final class FileWrapperJournal extends JournalBase
+{
+   private final ReentrantLock lockAppend = new ReentrantLock();
+
+   private final ConcurrentMap<Long, AtomicInteger> transactions = new ConcurrentHashMap<Long, AtomicInteger>();
+   private final JournalImpl journal;
+   protected volatile JournalFile currentFile;
+
+   /**
+    * @param journal
+    * @throws Exception
+    */
+   public FileWrapperJournal(Journal journal) throws Exception
+   {
+      super(journal.getFileFactory().isSupportsCallbacks(), journal.getFileSize());
+      this.journal = (JournalImpl)journal;
+      currentFile = this.journal.setUpCurrentFile(JournalImpl.SIZE_HEADER);
+   }
+
+   @Override
+   public void start() throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void stop() throws Exception
+   {
+      if (currentFile.getFile().isOpen())
+         currentFile.getFile().close();
+   }
+
+   @Override
+   public boolean isStarted()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   // ------------------------
+
+   // ------------------------
+
+   @Override
+   public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback) throws Exception
+   {
+      JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
+
+      writeRecord(addRecord, sync, callback);
+   }
+
+   /**
+    * Write the record to the current file.
+    */
+   private void writeRecord(JournalInternalRecord encoder, final boolean sync, final IOCompletion callback) throws Exception
+   {
+
+      lockAppend.lock();
+      try
+      {
+         if (callback != null)
+         {
+            callback.storeLineUp();
+         }
+         currentFile = journal.switchFileIfNecessary(encoder.getEncodeSize());
+         encoder.setFileID(currentFile.getRecordID());
+
+         if (callback != null)
+         {
+            currentFile.getFile().write(encoder, sync, callback);
+         }
+         else
+         {
+            currentFile.getFile().write(encoder, sync);
+         }
+      }
+      finally
+      {
+         lockAppend.unlock();
+      }
+   }
+
+   @Override
+   public void appendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception
+   {
+      JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
+      writeRecord(deleteRecord, sync, callback);
+   }
+
+   @Override
+   public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
+   {
+      count(txID);
+      JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
+      writeRecord(deleteRecordTX, false, null);
+   }
+
+   @Override
+   public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
+   {
+      count(txID);
+      JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+      writeRecord(addRecord, false, null);
+   }
+
+   @Override
+   public void
+   appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback) throws Exception
+   {
+      JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
+      writeRecord(updateRecord, sync, callback);
+   }
+
+   @Override
+   public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
+   {
+      count(txID);
+      JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
+      writeRecord(updateRecordTX, false, null);
+   }
+
+   @Override
+   public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception
+   {
+      JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
+      AtomicInteger value = transactions.remove(Long.valueOf(txID));
+      if (value != null)
+      {
+         commitRecord.setNumberOfRecords(value.get());
+      }
+
+      writeRecord(commitRecord, true, callback);
+   }
+
+   @Override
+   public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback) throws Exception
+   {
+      JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
+      AtomicInteger value = transactions.get(Long.valueOf(txID));
+      if (value != null)
+      {
+         prepareRecord.setNumberOfRecords(value.get());
+      }
+      writeRecord(prepareRecord, sync, callback);
+   }
+
+   private int count(long txID) throws HornetQException
+   {
+      AtomicInteger defaultValue = new AtomicInteger(1);
+      AtomicInteger count = transactions.putIfAbsent(Long.valueOf(txID), defaultValue);
+      if (count != null)
+      {
+         return count.incrementAndGet();
+      }
+      return defaultValue.get();
+   }
+
+   @Override
+   public String toString()
+   {
+      return FileWrapperJournal.class.getName() + "(currentFile=[" + currentFile + "], hash=" + super.toString() + ")";
+   }
+
+   // UNSUPPORTED STUFF
+
+   @Override
+   public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception
+   {
+      throw new HornetQUnsupportedPacketException();
+   }
+
+   @Override
+   public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception
+   {
+      throw new HornetQUnsupportedPacketException();
+   }
+
+   @Override
+   public JournalLoadInformation loadInternalOnly() throws Exception
+   {
+      throw new HornetQUnsupportedPacketException();
+   }
+
+   @Override
+   public void lineUpContext(IOCompletion callback)
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public JournalLoadInformation load(List<RecordInfo> committedRecords,
+                                      List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure) throws Exception
+   {
+      throw new HornetQUnsupportedPacketException();
+   }
+
+   @Override
+   public int getAlignment() throws Exception
+   {
+      throw new HornetQUnsupportedPacketException();
+   }
+
+   @Override
+   public int getNumberOfRecords()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public int getUserVersion()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void perfBlast(int pages)
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void runDirectJournalBlast() throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public JournalLoadInformation loadSyncOnly(JournalState state) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void synchronizationLock()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void synchronizationUnlock()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void forceMoveNextFile()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public JournalFile[] getDataFiles()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   void scheduleReclaim()
+   {
+      // no-op
+   }
+
+   @Override
+   public SequentialFileFactory getFileFactory()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void scheduleCompactAndBlock(int timeout) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void replicationSyncPreserveOldFiles()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void replicationSyncFinished()
+   {
+      throw new UnsupportedOperationException();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ImportJournal.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ImportJournal.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ImportJournal.java
new file mode 100644
index 0000000..314fb94
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/ImportJournal.java
@@ -0,0 +1,388 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq6.core.journal.RecordInfo;
+import org.apache.activemq6.utils.Base64;
+
+/**
+ * Use this class to import the journal data from a listed file. You can use it as a main class or
+ * through its native method
+ * {@link ImportJournal#importJournal(String, String, String, int, int, String)}
+ * <p>
+ * If you use the main method, use its arguments as:
+ *
+ * <pre>
+ * JournalDirectory JournalPrefix FileExtension MinFiles FileSize FileOutput
+ * </pre>
+ * <p>
+ * Example:
+ *
+ * <pre>
+ * java -cp hornetq-core.jar org.apache.activemq6.core.journal.impl.ExportJournal /journalDir hornetq-data hq 2 10485760 /tmp/export.dat
+ * </pre>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public class ImportJournal
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public static void main(final String[] arg)
+   {
+      if (arg.length != 5)
+      {
+         System.err.println("Use: java -cp hornetq-core.jar:netty.jar org.apache.activemq6.core.journal.impl.ImportJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>");
+         return;
+      }
+
+      try
+      {
+         ImportJournal.importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+
+   }
+
+   public static void importJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final String fileInput) throws Exception
+   {
+      FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
+      ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
+
+   }
+
+   public static void importJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final InputStream stream) throws Exception
+   {
+      Reader reader = new InputStreamReader(stream);
+      ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
+   }
+
+   public static void importJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final Reader reader) throws Exception
+   {
+
+      File journalDir = new File(directory);
+
+      if (!journalDir.exists())
+      {
+         if (!journalDir.mkdirs())
+            System.err.println("Could not create directory " + directory);
+      }
+
+      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null);
+
+      JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+
+      if (journal.orderFiles().size() != 0)
+      {
+         throw new IllegalStateException("Import needs to create a brand new journal");
+      }
+
+      journal.start();
+
+      // The journal is empty, as we checked already. Calling load just to initialize the internal data
+      journal.loadInternalOnly();
+
+      BufferedReader buffReader = new BufferedReader(reader);
+
+      String line;
+
+      HashMap<Long, AtomicInteger> txCounters = new HashMap<Long, AtomicInteger>();
+
+      long lineNumber = 0;
+
+      Map<Long, JournalRecord> journalRecords = journal.getRecords();
+
+      while ((line = buffReader.readLine()) != null)
+      {
+         lineNumber++;
+         String[] splitLine = line.split(",");
+         if (splitLine[0].equals("#File"))
+         {
+            txCounters.clear();
+            continue;
+         }
+
+         Properties lineProperties = ImportJournal.parseLine(splitLine);
+
+         String operation = null;
+         try
+         {
+            operation = lineProperties.getProperty("operation");
+
+            if (operation.equals("AddRecord"))
+            {
+               RecordInfo info = ImportJournal.parseRecord(lineProperties);
+               journal.appendAddRecord(info.id, info.userRecordType, info.data, false);
+            }
+            else if (operation.equals("AddRecordTX"))
+            {
+               long txID = ImportJournal.parseLong("txID", lineProperties);
+               AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
+               counter.incrementAndGet();
+               RecordInfo info = ImportJournal.parseRecord(lineProperties);
+               journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
+            }
+            else if (operation.equals("AddRecordTX"))
+            {
+               long txID = ImportJournal.parseLong("txID", lineProperties);
+               AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
+               counter.incrementAndGet();
+               RecordInfo info = ImportJournal.parseRecord(lineProperties);
+               journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
+            }
+            else if (operation.equals("UpdateTX"))
+            {
+               long txID = ImportJournal.parseLong("txID", lineProperties);
+               AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
+               counter.incrementAndGet();
+               RecordInfo info = ImportJournal.parseRecord(lineProperties);
+               journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data);
+            }
+            else if (operation.equals("Update"))
+            {
+               RecordInfo info = ImportJournal.parseRecord(lineProperties);
+               journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
+            }
+            else if (operation.equals("DeleteRecord"))
+            {
+               long id = ImportJournal.parseLong("id", lineProperties);
+
+               // If not found it means the append/update records were reclaimed already
+               if (journalRecords.get(id) != null)
+               {
+                  journal.appendDeleteRecord(id, false);
+               }
+            }
+            else if (operation.equals("DeleteRecordTX"))
+            {
+               long txID = ImportJournal.parseLong("txID", lineProperties);
+               long id = ImportJournal.parseLong("id", lineProperties);
+               AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
+               counter.incrementAndGet();
+
+               // If not found it means the append/update records were reclaimed already
+               if (journalRecords.get(id) != null)
+               {
+                  journal.appendDeleteRecordTransactional(txID, id);
+               }
+            }
+            else if (operation.equals("Prepare"))
+            {
+               long txID = ImportJournal.parseLong("txID", lineProperties);
+               int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties);
+               AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
+               byte[] data = ImportJournal.parseEncoding("extraData", lineProperties);
+
+               if (counter.get() == numberOfRecords)
+               {
+                  journal.appendPrepareRecord(txID, data, false);
+               }
+               else
+               {
+                  System.err.println("Transaction " + txID +
+                                     " at line " +
+                                     lineNumber +
+                                     " is incomplete. The prepare record expected " +
+                                     numberOfRecords +
+                                     " while the import only had " +
+                                     counter);
+               }
+            }
+            else if (operation.equals("Commit"))
+            {
+               long txID = ImportJournal.parseLong("txID", lineProperties);
+               int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties);
+               AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
+               if (counter.get() == numberOfRecords)
+               {
+                  journal.appendCommitRecord(txID, false);
+               }
+               else
+               {
+                  System.err.println("Transaction " + txID +
+                                     " at line " +
+                                     lineNumber +
+                                     " is incomplete. The commit record expected " +
+                                     numberOfRecords +
+                                     " while the import only had " +
+                                     counter);
+               }
+            }
+            else if (operation.equals("Rollback"))
+            {
+               long txID = ImportJournal.parseLong("txID", lineProperties);
+               journal.appendRollbackRecord(txID, false);
+            }
+            else
+            {
+               System.err.println("Invalid opeartion " + operation + " at line " + lineNumber);
+            }
+         }
+         catch (Exception ex)
+         {
+            System.err.println("Error at line " + lineNumber + ", operation=" + operation + " msg = " + ex.getMessage());
+         }
+      }
+
+      journal.stop();
+   }
+
+   protected static AtomicInteger getCounter(final Long txID, final Map<Long, AtomicInteger> txCounters)
+   {
+
+      AtomicInteger counter = txCounters.get(txID);
+      if (counter == null)
+      {
+         counter = new AtomicInteger(0);
+         txCounters.put(txID, counter);
+      }
+
+      return counter;
+   }
+
+   protected static RecordInfo parseRecord(final Properties properties) throws Exception
+   {
+      long id = ImportJournal.parseLong("id", properties);
+      byte userRecordType = ImportJournal.parseByte("userRecordType", properties);
+      boolean isUpdate = ImportJournal.parseBoolean("isUpdate", properties);
+      byte[] data = ImportJournal.parseEncoding("data", properties);
+      return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
+   }
+
+   private static byte[] parseEncoding(final String name, final Properties properties) throws Exception
+   {
+      String value = ImportJournal.parseString(name, properties);
+
+      return ImportJournal.decode(value);
+   }
+
+   /**
+    * @param properties
+    * @return
+    */
+   private static int parseInt(final String name, final Properties properties) throws Exception
+   {
+      String value = ImportJournal.parseString(name, properties);
+
+      return Integer.parseInt(value);
+   }
+
+   private static long parseLong(final String name, final Properties properties) throws Exception
+   {
+      String value = ImportJournal.parseString(name, properties);
+
+      return Long.parseLong(value);
+   }
+
+   private static boolean parseBoolean(final String name, final Properties properties) throws Exception
+   {
+      String value = ImportJournal.parseString(name, properties);
+
+      return Boolean.parseBoolean(value);
+   }
+
+   private static byte parseByte(final String name, final Properties properties) throws Exception
+   {
+      String value = ImportJournal.parseString(name, properties);
+
+      return Byte.parseByte(value);
+   }
+
+   /**
+    * @param name
+    * @param properties
+    * @return
+    * @throws Exception
+    */
+   private static String parseString(final String name, final Properties properties) throws Exception
+   {
+      String value = properties.getProperty(name);
+
+      if (value == null)
+      {
+         throw new Exception("property " + name + " not found");
+      }
+      return value;
+   }
+
+   protected static Properties parseLine(final String[] splitLine)
+   {
+      Properties properties = new Properties();
+
+      for (String el : splitLine)
+      {
+         String[] tuple = el.split("@");
+         if (tuple.length == 2)
+         {
+            properties.put(tuple[0], tuple[1]);
+         }
+         else
+         {
+            properties.put(tuple[0], tuple[0]);
+         }
+      }
+
+      return properties;
+   }
+
+   private static byte[] decode(final String data)
+   {
+      return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalBase.java
----------------------------------------------------------------------
diff --git a/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalBase.java b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalBase.java
new file mode 100644
index 0000000..04e5a22
--- /dev/null
+++ b/activemq6-journal/src/main/java/org/apache/activemq6/core/journal/impl/JournalBase.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.journal.impl;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.journal.EncodingSupport;
+import org.apache.activemq6.core.journal.IOCompletion;
+import org.apache.activemq6.core.journal.Journal;
+import org.apache.activemq6.core.journal.impl.dataformat.ByteArrayEncoding;
+
+abstract class JournalBase implements Journal
+{
+
+   protected final int fileSize;
+   private final boolean supportsCallback;
+
+   public JournalBase(boolean supportsCallback, int fileSize)
+   {
+      if (fileSize < JournalImpl.MIN_FILE_SIZE)
+      {
+         throw new IllegalArgumentException("File size cannot be less than " + JournalImpl.MIN_FILE_SIZE + " bytes");
+      }
+      this.supportsCallback = supportsCallback;
+      this.fileSize = fileSize;
+   }
+
+   public abstract void appendAddRecord(final long id, final byte recordType, final EncodingSupport record,
+                                        final boolean sync, final IOCompletion callback) throws Exception;
+
+   public abstract void appendAddRecordTransactional(final long txID, final long id, final byte recordType,
+                                                     final EncodingSupport record) throws Exception;
+
+   public abstract void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback,
+                                           boolean lineUpContext) throws Exception;
+
+   public abstract void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception;
+
+   public abstract void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception;
+
+   public abstract void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync,
+                                            final IOCompletion callback) throws Exception;
+
+   public abstract void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record,
+                                           final boolean sync, final IOCompletion callback) throws Exception;
+
+   public abstract void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType,
+                                                        final EncodingSupport record) throws Exception;
+
+   public abstract void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception;
+
+
+   public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
+   {
+      appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
+   }
+
+   public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+   {
+      SyncIOCompletion callback = getSyncCallback(sync);
+
+      appendAddRecord(id, recordType, record, sync, callback);
+
+      if (callback != null)
+      {
+         callback.waitCompletion();
+      }
+   }
+
+   public void appendCommitRecord(final long txID, final boolean sync) throws Exception
+   {
+      SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+      appendCommitRecord(txID, sync, syncCompletion, true);
+
+      if (syncCompletion != null)
+      {
+         syncCompletion.waitCompletion();
+      }
+   }
+
+   public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
+   {
+      appendCommitRecord(txID, sync, callback, true);
+   }
+
+   public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
+   {
+      appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
+   }
+
+   public void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType,
+                                               final byte[] record) throws Exception
+   {
+      appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
+   }
+
+   public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
+   {
+      appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
+   }
+
+   public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
+   {
+      appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
+   }
+
+   public void appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync) throws Exception
+   {
+      appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
+   }
+
+   public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
+   {
+      SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+      appendPrepareRecord(txID, transactionData, sync, syncCompletion);
+
+      if (syncCompletion != null)
+      {
+         syncCompletion.waitCompletion();
+      }
+   }
+
+   public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
+   {
+      appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
+   }
+
+   public void
+   appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
+   {
+      SyncIOCompletion callback = getSyncCallback(sync);
+
+      appendUpdateRecord(id, recordType, record, sync, callback);
+
+      if (callback != null)
+      {
+         callback.waitCompletion();
+      }
+   }
+
+   public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
+   {
+      SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+      appendRollbackRecord(txID, sync, syncCompletion);
+
+      if (syncCompletion != null)
+      {
+         syncCompletion.waitCompletion();
+      }
+
+   }
+
+   public void appendDeleteRecord(final long id, final boolean sync) throws Exception
+   {
+      SyncIOCompletion callback = getSyncCallback(sync);
+
+      appendDeleteRecord(id, sync, callback);
+
+      if (callback != null)
+      {
+         callback.waitCompletion();
+      }
+   }
+
+   abstract void scheduleReclaim();
+
+   protected SyncIOCompletion getSyncCallback(final boolean sync)
+   {
+      if (supportsCallback)
+      {
+         if (sync)
+         {
+            return new SimpleWaitIOCallback();
+         }
+         return DummyCallback.getInstance();
+      }
+      return null;
+   }
+
+   private static final class NullEncoding implements EncodingSupport
+   {
+
+      private static NullEncoding instance = new NullEncoding();
+
+      public void decode(final HornetQBuffer buffer)
+      {
+         // no-op
+      }
+
+      public void encode(final HornetQBuffer buffer)
+      {
+         // no-op
+      }
+
+      public int getEncodeSize()
+      {
+         return 0;
+      }
+   }
+
+   public int getFileSize()
+   {
+      return fileSize;
+   }
+}


Mime
View raw message