Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 119CA10A70 for ; Thu, 26 Feb 2015 09:26:02 +0000 (UTC) Received: (qmail 78426 invoked by uid 500); 26 Feb 2015 09:26:01 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 78281 invoked by uid 500); 26 Feb 2015 09:26:01 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 78261 invoked by uid 99); 26 Feb 2015 09:26:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Feb 2015 09:26:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B95F3E0A46; Thu, 26 Feb 2015 09:26:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andytaylor@apache.org To: commits@activemq.apache.org Date: Thu, 26 Feb 2015 09:26:02 -0000 Message-Id: <15fbea2e11fe4bea80dfc30207d192d4@git.apache.org> In-Reply-To: <2a31f8b51ac845aca133b4935e8bbb97@git.apache.org> References: <2a31f8b51ac845aca133b4935e8bbb97@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/5] activemq-6 git commit: moving export-journal / import-journal to the proper tools place moving export-journal / import-journal to the proper tools place This is simply moving the export/import journal to its proper place. The previous commit should have added docs about this Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/210222e2 Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/210222e2 Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/210222e2 Branch: refs/heads/master Commit: 210222e24fa4cc7e8b184ca1c3d4a9bc52d560bc Parents: ed7c429 Author: Clebert Suconic Authored: Wed Feb 25 22:45:21 2015 -0500 Committer: Clebert Suconic Committed: Thu Feb 26 00:26:52 2015 -0500 ---------------------------------------------------------------------- .../core/journal/impl/ExportJournal.java | 210 ---------- .../core/journal/impl/ImportJournal.java | 392 ------------------ .../apache/activemq/tools/ExportJournal.java | 226 ++++++++++ .../apache/activemq/tools/ImportJournal.java | 413 +++++++++++++++++++ .../java/org/apache/activemq/tools/Main.java | 16 +- .../journal/NIOJournalCompactTest.java | 2 +- .../persistence/ExportFormatTest.java | 6 +- tests/unit-tests/pom.xml | 5 + .../core/journal/impl/JournalImplTestBase.java | 4 +- 9 files changed, 665 insertions(+), 609 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/210222e2/activemq-journal/src/main/java/org/apache/activemq/core/journal/impl/ExportJournal.java ---------------------------------------------------------------------- diff --git a/activemq-journal/src/main/java/org/apache/activemq/core/journal/impl/ExportJournal.java b/activemq-journal/src/main/java/org/apache/activemq/core/journal/impl/ExportJournal.java deleted file mode 100644 index e00204a..0000000 --- a/activemq-journal/src/main/java/org/apache/activemq/core/journal/impl/ExportJournal.java +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.core.journal.impl; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.PrintStream; -import java.util.List; - -import org.apache.activemq.core.journal.RecordInfo; -import org.apache.activemq.core.journal.SequentialFileFactory; -import org.apache.activemq.utils.Base64; - -/** - * Use this class to export the journal data. You can use it as a main class or through its native method {@link ExportJournal#exportJournal(String, String, String, int, int, String)} - * - * If you use the main method, use it as - * - * Example: java -cp activemq-core.jar org.apache.activemq.core.journal.impl.ExportJournal /journalDir activemq-data amq 2 10485760 /tmp/export.dat - * - * @author Clebert Suconic - * - * - */ -public class ExportJournal -{ - - // Constants ----------------------------------------------------- - - // Attributes ---------------------------------------------------- - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - // Public -------------------------------------------------------- - - public static void main(final String[] arg) - { - if (arg.length != 5) - { - System.err.println("Use: java -cp activemq-core.jar org.apache.activemq.core.journal.impl.ExportJournal "); - return; - } - - try - { - ExportJournal.exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]); - } - catch (Exception e) - { - e.printStackTrace(); - } - - } - - public static void exportJournal(final String directory, - final String journalPrefix, - final String journalSuffix, - final int minFiles, - final int fileSize, - final String fileOutput) throws Exception - { - - FileOutputStream fileOut = new FileOutputStream(new File(fileOutput)); - - BufferedOutputStream buffOut = new BufferedOutputStream(fileOut); - - PrintStream out = new PrintStream(buffOut); - - ExportJournal.exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out); - - out.close(); - } - - public static void exportJournal(final String directory, - final String journalPrefix, - final String journalSuffix, - final int minFiles, - final int fileSize, - final PrintStream out) throws Exception - { - NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null); - - JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); - - List files = journal.orderFiles(); - - for (JournalFile file : files) - { - out.println("#File," + file); - - ExportJournal.exportJournalFile(out, nio, file); - } - } - - /** - * @param out - * @param fileFactory - * @param file - * @throws Exception - */ - public static void exportJournalFile(final PrintStream out, - final SequentialFileFactory fileFactory, - final JournalFile file) throws Exception - { - JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback() - { - - public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception - { - out.println("operation@UpdateTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo)); - } - - public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception - { - out.println("operation@Update," + ExportJournal.describeRecord(recordInfo)); - } - - public void onReadRollbackRecord(final long transactionID) throws Exception - { - out.println("operation@Rollback,txID@" + transactionID); - } - - public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception - { - out.println("operation@Prepare,txID@" + transactionID + - ",numberOfRecords@" + - numberOfRecords + - ",extraData@" + - ExportJournal.encode(extraData)); - } - - public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception - { - out.println("operation@DeleteRecordTX,txID@" + transactionID + - "," + - ExportJournal.describeRecord(recordInfo)); - } - - public void onReadDeleteRecord(final long recordID) throws Exception - { - out.println("operation@DeleteRecord,id@" + recordID); - } - - public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception - { - out.println("operation@Commit,txID@" + transactionID + ",numberOfRecords@" + numberOfRecords); - } - - public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception - { - out.println("operation@AddRecordTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo)); - } - - public void onReadAddRecord(final RecordInfo recordInfo) throws Exception - { - out.println("operation@AddRecord," + ExportJournal.describeRecord(recordInfo)); - } - - public void markAsDataFile(final JournalFile file) - { - } - }); - } - - private static String describeRecord(final RecordInfo recordInfo) - { - return "id@" + recordInfo.id + - ",userRecordType@" + - recordInfo.userRecordType + - ",length@" + - recordInfo.data.length + - ",isUpdate@" + - recordInfo.isUpdate + - ",compactCount@" + - recordInfo.compactCount + - ",data@" + - ExportJournal.encode(recordInfo.data); - } - - private static String encode(final byte[] data) - { - return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/210222e2/activemq-journal/src/main/java/org/apache/activemq/core/journal/impl/ImportJournal.java ---------------------------------------------------------------------- diff --git a/activemq-journal/src/main/java/org/apache/activemq/core/journal/impl/ImportJournal.java b/activemq-journal/src/main/java/org/apache/activemq/core/journal/impl/ImportJournal.java deleted file mode 100644 index 155aeae..0000000 --- a/activemq-journal/src/main/java/org/apache/activemq/core/journal/impl/ImportJournal.java +++ /dev/null @@ -1,392 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.core.journal.impl; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.activemq.core.journal.RecordInfo; -import org.apache.activemq.utils.Base64; - -/** - * Use this class to import the journal data from a listed file. You can use it as a main class or - * through its native method - * {@link ImportJournal#importJournal(String, String, String, int, int, String)} - *

- * If you use the main method, use its arguments as: - * - *

- * JournalDirectory JournalPrefix FileExtension MinFiles FileSize FileOutput
- * 
- *

- * Example: - * - *

- * java -cp activemq-core.jar org.apache.activemq.core.journal.impl.ExportJournal /journalDir activemq-data amq 2 10485760 /tmp/export.dat
- * 
- * @author Clebert Suconic - */ -public class ImportJournal -{ - - // Constants ----------------------------------------------------- - - // Attributes ---------------------------------------------------- - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - // Public -------------------------------------------------------- - - public static void main(final String[] arg) - { - if (arg.length != 5) - { - System.err.println("Use: java -cp activemq-core.jar:netty.jar org.apache.activemq.core.journal.impl.ImportJournal "); - return; - } - - try - { - ImportJournal.importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]); - } - catch (Exception e) - { - e.printStackTrace(); - } - - } - - public static void importJournal(final String directory, - final String journalPrefix, - final String journalSuffix, - final int minFiles, - final int fileSize, - final String fileInput) throws Exception - { - FileInputStream fileInputStream = new FileInputStream(new File(fileInput)); - ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream); - - } - - public static void importJournal(final String directory, - final String journalPrefix, - final String journalSuffix, - final int minFiles, - final int fileSize, - final InputStream stream) throws Exception - { - Reader reader = new InputStreamReader(stream); - ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader); - } - - public static void importJournal(final String directory, - final String journalPrefix, - final String journalSuffix, - final int minFiles, - final int fileSize, - final Reader reader) throws Exception - { - - File journalDir = new File(directory); - - if (!journalDir.exists()) - { - if (!journalDir.mkdirs()) - System.err.println("Could not create directory " + directory); - } - - NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null); - - JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); - - if (journal.orderFiles().size() != 0) - { - throw new IllegalStateException("Import needs to create a brand new journal"); - } - - journal.start(); - - // The journal is empty, as we checked already. Calling load just to initialize the internal data - journal.loadInternalOnly(); - - BufferedReader buffReader = new BufferedReader(reader); - - String line; - - HashMap txCounters = new HashMap(); - - long lineNumber = 0; - - Map journalRecords = journal.getRecords(); - - while ((line = buffReader.readLine()) != null) - { - lineNumber++; - String[] splitLine = line.split(","); - if (splitLine[0].equals("#File")) - { - txCounters.clear(); - continue; - } - - Properties lineProperties = ImportJournal.parseLine(splitLine); - - String operation = null; - try - { - operation = lineProperties.getProperty("operation"); - - if (operation.equals("AddRecord")) - { - RecordInfo info = ImportJournal.parseRecord(lineProperties); - journal.appendAddRecord(info.id, info.userRecordType, info.data, false); - } - else if (operation.equals("AddRecordTX")) - { - long txID = ImportJournal.parseLong("txID", lineProperties); - AtomicInteger counter = ImportJournal.getCounter(txID, txCounters); - counter.incrementAndGet(); - RecordInfo info = ImportJournal.parseRecord(lineProperties); - journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data); - } - else if (operation.equals("AddRecordTX")) - { - long txID = ImportJournal.parseLong("txID", lineProperties); - AtomicInteger counter = ImportJournal.getCounter(txID, txCounters); - counter.incrementAndGet(); - RecordInfo info = ImportJournal.parseRecord(lineProperties); - journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data); - } - else if (operation.equals("UpdateTX")) - { - long txID = ImportJournal.parseLong("txID", lineProperties); - AtomicInteger counter = ImportJournal.getCounter(txID, txCounters); - counter.incrementAndGet(); - RecordInfo info = ImportJournal.parseRecord(lineProperties); - journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data); - } - else if (operation.equals("Update")) - { - RecordInfo info = ImportJournal.parseRecord(lineProperties); - journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false); - } - else if (operation.equals("DeleteRecord")) - { - long id = ImportJournal.parseLong("id", lineProperties); - - // If not found it means the append/update records were reclaimed already - if (journalRecords.get(id) != null) - { - journal.appendDeleteRecord(id, false); - } - } - else if (operation.equals("DeleteRecordTX")) - { - long txID = ImportJournal.parseLong("txID", lineProperties); - long id = ImportJournal.parseLong("id", lineProperties); - AtomicInteger counter = ImportJournal.getCounter(txID, txCounters); - counter.incrementAndGet(); - - // If not found it means the append/update records were reclaimed already - if (journalRecords.get(id) != null) - { - journal.appendDeleteRecordTransactional(txID, id); - } - } - else if (operation.equals("Prepare")) - { - long txID = ImportJournal.parseLong("txID", lineProperties); - int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties); - AtomicInteger counter = ImportJournal.getCounter(txID, txCounters); - byte[] data = ImportJournal.parseEncoding("extraData", lineProperties); - - if (counter.get() == numberOfRecords) - { - journal.appendPrepareRecord(txID, data, false); - } - else - { - System.err.println("Transaction " + txID + - " at line " + - lineNumber + - " is incomplete. The prepare record expected " + - numberOfRecords + - " while the import only had " + - counter); - } - } - else if (operation.equals("Commit")) - { - long txID = ImportJournal.parseLong("txID", lineProperties); - int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties); - AtomicInteger counter = ImportJournal.getCounter(txID, txCounters); - if (counter.get() == numberOfRecords) - { - journal.appendCommitRecord(txID, false); - } - else - { - System.err.println("Transaction " + txID + - " at line " + - lineNumber + - " is incomplete. The commit record expected " + - numberOfRecords + - " while the import only had " + - counter); - } - } - else if (operation.equals("Rollback")) - { - long txID = ImportJournal.parseLong("txID", lineProperties); - journal.appendRollbackRecord(txID, false); - } - else - { - System.err.println("Invalid opeartion " + operation + " at line " + lineNumber); - } - } - catch (Exception ex) - { - System.err.println("Error at line " + lineNumber + ", operation=" + operation + " msg = " + ex.getMessage()); - } - } - - journal.stop(); - } - - protected static AtomicInteger getCounter(final Long txID, final Map txCounters) - { - - AtomicInteger counter = txCounters.get(txID); - if (counter == null) - { - counter = new AtomicInteger(0); - txCounters.put(txID, counter); - } - - return counter; - } - - protected static RecordInfo parseRecord(final Properties properties) throws Exception - { - long id = ImportJournal.parseLong("id", properties); - byte userRecordType = ImportJournal.parseByte("userRecordType", properties); - boolean isUpdate = ImportJournal.parseBoolean("isUpdate", properties); - byte[] data = ImportJournal.parseEncoding("data", properties); - return new RecordInfo(id, userRecordType, data, isUpdate, (short)0); - } - - private static byte[] parseEncoding(final String name, final Properties properties) throws Exception - { - String value = ImportJournal.parseString(name, properties); - - return ImportJournal.decode(value); - } - - /** - * @param properties - * @return - */ - private static int parseInt(final String name, final Properties properties) throws Exception - { - String value = ImportJournal.parseString(name, properties); - - return Integer.parseInt(value); - } - - private static long parseLong(final String name, final Properties properties) throws Exception - { - String value = ImportJournal.parseString(name, properties); - - return Long.parseLong(value); - } - - private static boolean parseBoolean(final String name, final Properties properties) throws Exception - { - String value = ImportJournal.parseString(name, properties); - - return Boolean.parseBoolean(value); - } - - private static byte parseByte(final String name, final Properties properties) throws Exception - { - String value = ImportJournal.parseString(name, properties); - - return Byte.parseByte(value); - } - - /** - * @param name - * @param properties - * @return - * @throws Exception - */ - private static String parseString(final String name, final Properties properties) throws Exception - { - String value = properties.getProperty(name); - - if (value == null) - { - throw new Exception("property " + name + " not found"); - } - return value; - } - - protected static Properties parseLine(final String[] splitLine) - { - Properties properties = new Properties(); - - for (String el : splitLine) - { - String[] tuple = el.split("@"); - if (tuple.length == 2) - { - properties.put(tuple[0], tuple[1]); - } - else - { - properties.put(tuple[0], tuple[0]); - } - } - - return properties; - } - - private static byte[] decode(final String data) - { - return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/210222e2/activemq-tools/src/main/java/org/apache/activemq/tools/ExportJournal.java ---------------------------------------------------------------------- diff --git a/activemq-tools/src/main/java/org/apache/activemq/tools/ExportJournal.java b/activemq-tools/src/main/java/org/apache/activemq/tools/ExportJournal.java new file mode 100644 index 0000000..9e175b7 --- /dev/null +++ b/activemq-tools/src/main/java/org/apache/activemq/tools/ExportJournal.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.tools; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.List; + +import org.apache.activemq.core.journal.RecordInfo; +import org.apache.activemq.core.journal.SequentialFileFactory; +import org.apache.activemq.core.journal.impl.JournalFile; +import org.apache.activemq.core.journal.impl.JournalImpl; +import org.apache.activemq.core.journal.impl.JournalReaderCallback; +import org.apache.activemq.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.utils.Base64; + +/** + * Use this class to export the journal data. You can use it as a main class or through its static method {@link #exportJournal(String, String, String, int, int, String)} + *

+ * If you use the main method, use it as + *

+ * Example: java -cp activemq-tools*-jar-with-dependencies.jar export-journal /journalDir activemq-data amq 2 10485760 /tmp/export.dat + * + * @author Clebert Suconic + */ +public class ExportJournal +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + public void process(final String[] arg) + { + if (arg.length != 6) + { + for (int i = 0; i < arg.length; i++) + { + System.out.println("arg[" + i + "] = " + arg[i]); + } + printUsage(); + System.exit(-1); + } + + try + { + exportJournal(arg[1], arg[2], arg[3], 2, Integer.parseInt(arg[4]), arg[5]); + } + catch (Exception e) + { + e.printStackTrace(); + } + + } + + public static void exportJournal(final String directory, + final String journalPrefix, + final String journalSuffix, + final int minFiles, + final int fileSize, + final String fileOutput) throws Exception + { + + FileOutputStream fileOut = new FileOutputStream(new File(fileOutput)); + + BufferedOutputStream buffOut = new BufferedOutputStream(fileOut); + + PrintStream out = new PrintStream(buffOut); + + exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out); + + out.close(); + } + + public static void exportJournal(final String directory, + final String journalPrefix, + final String journalSuffix, + final int minFiles, + final int fileSize, + final PrintStream out) throws Exception + { + NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null); + + JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); + + List files = journal.orderFiles(); + + for (JournalFile file : files) + { + out.println("#File," + file); + + exportJournalFile(out, nio, file); + } + } + + /** + * @param out + * @param fileFactory + * @param file + * @throws Exception + */ + public static void exportJournalFile(final PrintStream out, + final SequentialFileFactory fileFactory, + final JournalFile file) throws Exception + { + JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback() + { + + public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception + { + out.println("operation@UpdateTX,txID@" + transactionID + "," + describeRecord(recordInfo)); + } + + public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception + { + out.println("operation@Update," + describeRecord(recordInfo)); + } + + public void onReadRollbackRecord(final long transactionID) throws Exception + { + out.println("operation@Rollback,txID@" + transactionID); + } + + public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception + { + out.println("operation@Prepare,txID@" + transactionID + + ",numberOfRecords@" + + numberOfRecords + + ",extraData@" + + encode(extraData)); + } + + public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception + { + out.println("operation@DeleteRecordTX,txID@" + transactionID + + "," + + describeRecord(recordInfo)); + } + + public void onReadDeleteRecord(final long recordID) throws Exception + { + out.println("operation@DeleteRecord,id@" + recordID); + } + + public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception + { + out.println("operation@Commit,txID@" + transactionID + ",numberOfRecords@" + numberOfRecords); + } + + public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception + { + out.println("operation@AddRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo)); + } + + public void onReadAddRecord(final RecordInfo recordInfo) throws Exception + { + out.println("operation@AddRecord," + describeRecord(recordInfo)); + } + + public void markAsDataFile(final JournalFile file) + { + } + }); + } + + private static String describeRecord(final RecordInfo recordInfo) + { + return "id@" + recordInfo.id + + ",userRecordType@" + + recordInfo.userRecordType + + ",length@" + + recordInfo.data.length + + ",isUpdate@" + + recordInfo.isUpdate + + ",compactCount@" + + recordInfo.compactCount + + ",data@" + + encode(recordInfo.data); + } + + private static String encode(final byte[] data) + { + return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); + } + + + public void printUsage() + { + for (int i = 0; i < 10; i++) + { + System.err.println(); + } + System.err.println("This method will export the journal at low level record."); + System.err.println(); + System.err.println(Main.USAGE + " export-journal "); + System.err.println(); + for (int i = 0; i < 10; i++) + { + System.err.println(); + } + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/210222e2/activemq-tools/src/main/java/org/apache/activemq/tools/ImportJournal.java ---------------------------------------------------------------------- diff --git a/activemq-tools/src/main/java/org/apache/activemq/tools/ImportJournal.java b/activemq-tools/src/main/java/org/apache/activemq/tools/ImportJournal.java new file mode 100644 index 0000000..77a04fd --- /dev/null +++ b/activemq-tools/src/main/java/org/apache/activemq/tools/ImportJournal.java @@ -0,0 +1,413 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.tools; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.core.journal.RecordInfo; +import org.apache.activemq.core.journal.impl.JournalImpl; +import org.apache.activemq.core.journal.impl.JournalRecord; +import org.apache.activemq.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.utils.Base64; + +/** + * Use this class to import the journal data from a listed file. You can use it as a main class or + * through its native method + * {@link #importJournal(String, String, String, int, int, String)} + *

+ * If you use the main method, use its arguments as: + * + *

+ * JournalDirectory JournalPrefix FileExtension MinFiles FileSize FileOutput
+ * 
+ *

+ * Example: + * + *

+ * java -cp activemq-core.jar org.apache.activemq.core.journal.impl.ExportJournal /journalDir activemq-data amq 2 10485760 /tmp/export.dat
+ * 
+ * @author Clebert Suconic + */ +public class ImportJournal +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + + public void process(final String[] arg) + { + for (int i = 0; i < arg.length; i++) + { + System.out.println("arg[" + i + "] = " + arg[i]); + } + if (arg.length != 6) + { + for (int i = 0; i < arg.length; i++) + { + System.out.println("arg[" + i + "] = " + arg[i]); + } + printUsage(); + System.exit(-1); + } + + try + { + importJournal(arg[1], arg[2], arg[3], 2, Integer.parseInt(arg[4]), arg[5]); + } + catch (Exception e) + { + e.printStackTrace(); + } + + } + + public static void importJournal(final String directory, + final String journalPrefix, + final String journalSuffix, + final int minFiles, + final int fileSize, + final String fileInput) throws Exception + { + FileInputStream fileInputStream = new FileInputStream(new File(fileInput)); + importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream); + + } + + public static void importJournal(final String directory, + final String journalPrefix, + final String journalSuffix, + final int minFiles, + final int fileSize, + final InputStream stream) throws Exception + { + Reader reader = new InputStreamReader(stream); + importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader); + } + + public static void importJournal(final String directory, + final String journalPrefix, + final String journalSuffix, + final int minFiles, + final int fileSize, + final Reader reader) throws Exception + { + + File journalDir = new File(directory); + + if (!journalDir.exists()) + { + if (!journalDir.mkdirs()) + System.err.println("Could not create directory " + directory); + } + + NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null); + + JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1); + + if (journal.orderFiles().size() != 0) + { + throw new IllegalStateException("Import needs to create a brand new journal"); + } + + journal.start(); + + // The journal is empty, as we checked already. Calling load just to initialize the internal data + journal.loadInternalOnly(); + + BufferedReader buffReader = new BufferedReader(reader); + + String line; + + HashMap txCounters = new HashMap(); + + long lineNumber = 0; + + Map journalRecords = journal.getRecords(); + + while ((line = buffReader.readLine()) != null) + { + lineNumber++; + String[] splitLine = line.split(","); + if (splitLine[0].equals("#File")) + { + txCounters.clear(); + continue; + } + + Properties lineProperties = parseLine(splitLine); + + String operation = null; + try + { + operation = lineProperties.getProperty("operation"); + + if (operation.equals("AddRecord")) + { + RecordInfo info = parseRecord(lineProperties); + journal.appendAddRecord(info.id, info.userRecordType, info.data, false); + } + else if (operation.equals("AddRecordTX")) + { + long txID = parseLong("txID", lineProperties); + AtomicInteger counter = getCounter(txID, txCounters); + counter.incrementAndGet(); + RecordInfo info = parseRecord(lineProperties); + journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data); + } + else if (operation.equals("AddRecordTX")) + { + long txID = parseLong("txID", lineProperties); + AtomicInteger counter = getCounter(txID, txCounters); + counter.incrementAndGet(); + RecordInfo info = parseRecord(lineProperties); + journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data); + } + else if (operation.equals("UpdateTX")) + { + long txID = parseLong("txID", lineProperties); + AtomicInteger counter = getCounter(txID, txCounters); + counter.incrementAndGet(); + RecordInfo info = parseRecord(lineProperties); + journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data); + } + else if (operation.equals("Update")) + { + RecordInfo info = parseRecord(lineProperties); + journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false); + } + else if (operation.equals("DeleteRecord")) + { + long id = parseLong("id", lineProperties); + + // If not found it means the append/update records were reclaimed already + if (journalRecords.get(id) != null) + { + journal.appendDeleteRecord(id, false); + } + } + else if (operation.equals("DeleteRecordTX")) + { + long txID = parseLong("txID", lineProperties); + long id = parseLong("id", lineProperties); + AtomicInteger counter = getCounter(txID, txCounters); + counter.incrementAndGet(); + + // If not found it means the append/update records were reclaimed already + if (journalRecords.get(id) != null) + { + journal.appendDeleteRecordTransactional(txID, id); + } + } + else if (operation.equals("Prepare")) + { + long txID = parseLong("txID", lineProperties); + int numberOfRecords = parseInt("numberOfRecords", lineProperties); + AtomicInteger counter = getCounter(txID, txCounters); + byte[] data = parseEncoding("extraData", lineProperties); + + if (counter.get() == numberOfRecords) + { + journal.appendPrepareRecord(txID, data, false); + } + else + { + System.err.println("Transaction " + txID + + " at line " + + lineNumber + + " is incomplete. The prepare record expected " + + numberOfRecords + + " while the import only had " + + counter); + } + } + else if (operation.equals("Commit")) + { + long txID = parseLong("txID", lineProperties); + int numberOfRecords = parseInt("numberOfRecords", lineProperties); + AtomicInteger counter = getCounter(txID, txCounters); + if (counter.get() == numberOfRecords) + { + journal.appendCommitRecord(txID, false); + } + else + { + System.err.println("Transaction " + txID + + " at line " + + lineNumber + + " is incomplete. The commit record expected " + + numberOfRecords + + " while the import only had " + + counter); + } + } + else if (operation.equals("Rollback")) + { + long txID = parseLong("txID", lineProperties); + journal.appendRollbackRecord(txID, false); + } + else + { + System.err.println("Invalid operation " + operation + " at line " + lineNumber); + } + } + catch (Exception ex) + { + System.err.println("Error at line " + lineNumber + ", operation=" + operation + " msg = " + ex.getMessage()); + } + } + + journal.stop(); + } + + protected static AtomicInteger getCounter(final Long txID, final Map txCounters) + { + + AtomicInteger counter = txCounters.get(txID); + if (counter == null) + { + counter = new AtomicInteger(0); + txCounters.put(txID, counter); + } + + return counter; + } + + protected static RecordInfo parseRecord(final Properties properties) throws Exception + { + long id = parseLong("id", properties); + byte userRecordType = parseByte("userRecordType", properties); + boolean isUpdate = parseBoolean("isUpdate", properties); + byte[] data = parseEncoding("data", properties); + return new RecordInfo(id, userRecordType, data, isUpdate, (short)0); + } + + private static byte[] parseEncoding(final String name, final Properties properties) throws Exception + { + String value = parseString(name, properties); + + return decode(value); + } + + /** + * @param properties + * @return + */ + private static int parseInt(final String name, final Properties properties) throws Exception + { + String value = parseString(name, properties); + + return Integer.parseInt(value); + } + + private static long parseLong(final String name, final Properties properties) throws Exception + { + String value = parseString(name, properties); + + return Long.parseLong(value); + } + + private static boolean parseBoolean(final String name, final Properties properties) throws Exception + { + String value = parseString(name, properties); + + return Boolean.parseBoolean(value); + } + + private static byte parseByte(final String name, final Properties properties) throws Exception + { + String value = parseString(name, properties); + + return Byte.parseByte(value); + } + + /** + * @param name + * @param properties + * @return + * @throws Exception + */ + private static String parseString(final String name, final Properties properties) throws Exception + { + String value = properties.getProperty(name); + + if (value == null) + { + throw new Exception("property " + name + " not found"); + } + return value; + } + + protected static Properties parseLine(final String[] splitLine) + { + Properties properties = new Properties(); + + for (String el : splitLine) + { + String[] tuple = el.split("@"); + if (tuple.length == 2) + { + properties.put(tuple[0], tuple[1]); + } + else + { + properties.put(tuple[0], tuple[0]); + } + } + + return properties; + } + + private static byte[] decode(final String data) + { + return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE); + } + + + public void printUsage() + { + for (int i = 0; i < 10; i++) + { + System.err.println(); + } + System.err.println("This method will export the journal at low level record."); + System.err.println(); + System.err.println(Main.USAGE + " import-journal "); + System.err.println(); + for (int i = 0; i < 10; i++) + { + System.err.println(); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/210222e2/activemq-tools/src/main/java/org/apache/activemq/tools/Main.java ---------------------------------------------------------------------- diff --git a/activemq-tools/src/main/java/org/apache/activemq/tools/Main.java b/activemq-tools/src/main/java/org/apache/activemq/tools/Main.java index afbf4f4..9eee57f 100644 --- a/activemq-tools/src/main/java/org/apache/activemq/tools/Main.java +++ b/activemq-tools/src/main/java/org/apache/activemq/tools/Main.java @@ -25,7 +25,9 @@ public class Main private static final String PRINT_PAGES = "print-pages"; private static final String DATA_TOOL = "data-tool"; private static final String TRANSFER = "transfer-queue"; - private static final String OPTIONS = " [" + IMPORT + "|" + EXPORT + "|" + PRINT_DATA + "|" + PRINT_PAGES + "|" + DATA_TOOL + "|" + TRANSFER + "]"; + private static final String EXPORT_JOURNAL = "export-journal"; + private static final String IMPORT_JOURNAL = "import-journal"; + private static final String OPTIONS = " [" + IMPORT + "|" + EXPORT + "|" + PRINT_DATA + "|" + PRINT_PAGES + "|" + DATA_TOOL + "|" + TRANSFER + "|" + EXPORT_JOURNAL + "|" + IMPORT_JOURNAL + "]"; public static void main(String[] arg) throws Exception { @@ -36,7 +38,17 @@ public class Main } - if (TRANSFER.equals(arg[0])) + if (IMPORT_JOURNAL.equals(arg[0])) + { + ImportJournal tool = new ImportJournal(); + tool.process(arg); + } + else if (EXPORT_JOURNAL.equals(arg[0])) + { + ExportJournal tool = new ExportJournal(); + tool.process(arg); + } + else if (TRANSFER.equals(arg[0])) { TransferQueue tool = new TransferQueue(); tool.process(arg); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/210222e2/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/journal/NIOJournalCompactTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/journal/NIOJournalCompactTest.java index 992437c..e24905f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/journal/NIOJournalCompactTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/journal/NIOJournalCompactTest.java @@ -38,7 +38,6 @@ import org.apache.activemq.core.journal.RecordInfo; import org.apache.activemq.core.journal.SequentialFile; import org.apache.activemq.core.journal.SequentialFileFactory; import org.apache.activemq.core.journal.impl.AbstractJournalUpdateTask; -import org.apache.activemq.core.journal.impl.ExportJournal; import org.apache.activemq.core.journal.impl.JournalCompactor; import org.apache.activemq.core.journal.impl.JournalFile; import org.apache.activemq.core.journal.impl.JournalFileImpl; @@ -50,6 +49,7 @@ import org.apache.activemq.core.server.impl.ServerMessageImpl; import org.apache.activemq.tests.unit.core.journal.impl.JournalImplTestBase; import org.apache.activemq.tests.unit.core.journal.impl.fakes.SimpleEncoding; import org.apache.activemq.tests.util.UnitTestCase; +import org.apache.activemq.tools.ExportJournal; import org.apache.activemq.utils.IDGenerator; import org.apache.activemq.utils.OrderedExecutorFactory; import org.apache.activemq.utils.SimpleIDGenerator; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/210222e2/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/persistence/ExportFormatTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/persistence/ExportFormatTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/persistence/ExportFormatTest.java index fbd9e74..f61b900 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/persistence/ExportFormatTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/persistence/ExportFormatTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.tests.integration.persistence; +import org.apache.activemq.tools.ExportJournal; +import org.apache.activemq.tools.ImportJournal; import org.junit.Ignore; import org.junit.Test; @@ -27,8 +29,6 @@ import org.apache.activemq.api.core.client.ClientProducer; import org.apache.activemq.api.core.client.ClientSession; import org.apache.activemq.api.core.client.ClientSessionFactory; import org.apache.activemq.api.core.client.ServerLocator; -import org.apache.activemq.core.journal.impl.ExportJournal; -import org.apache.activemq.core.journal.impl.ImportJournal; import org.apache.activemq.core.server.ActiveMQServer; import org.apache.activemq.tests.util.ServiceTestBase; @@ -117,10 +117,12 @@ public class ExportFormatTest extends ServiceTestBase locator.close(); server.stop(); + System.out.println(); System.out.println("copy & paste the following as bindingsFile:"); ExportJournal.exportJournal(getBindingsDir(), "activemq-bindings", "bindings", 2, 1048576, System.out); + System.out.println(); System.out.println("copy & paste the following as dataFile:"); ExportJournal.exportJournal(getJournalDir(), "activemq-data", "amq", 2, 102400, System.out); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/210222e2/tests/unit-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tests/unit-tests/pom.xml b/tests/unit-tests/pom.xml index cc08555..5c22bb0 100644 --- a/tests/unit-tests/pom.xml +++ b/tests/unit-tests/pom.xml @@ -67,6 +67,11 @@ ${project.version} + org.apache.activemq + activemq-tools + ${project.version} + + org.apache.geronimo.specs geronimo-j2ee-connector_1.5_spec http://git-wip-us.apache.org/repos/asf/activemq-6/blob/210222e2/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/journal/impl/JournalImplTestBase.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/journal/impl/JournalImplTestBase.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/journal/impl/JournalImplTestBase.java index 5ed11a1..01a42e2 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/journal/impl/JournalImplTestBase.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/journal/impl/JournalImplTestBase.java @@ -15,6 +15,8 @@ * limitations under the License. */ package org.apache.activemq.tests.unit.core.journal.impl; +import org.apache.activemq.tools.ExportJournal; +import org.apache.activemq.tools.ImportJournal; import org.junit.Before; import org.junit.After; @@ -35,8 +37,6 @@ import org.apache.activemq.core.journal.PreparedTransactionInfo; import org.apache.activemq.core.journal.RecordInfo; import org.apache.activemq.core.journal.SequentialFileFactory; import org.apache.activemq.core.journal.TestableJournal; -import org.apache.activemq.core.journal.impl.ExportJournal; -import org.apache.activemq.core.journal.impl.ImportJournal; import org.apache.activemq.core.journal.impl.JournalImpl; import org.apache.activemq.tests.util.UnitTestCase; import org.apache.activemq.utils.ReusableLatch;