activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [07/11] activemq-artemis git commit: Moving artemis-tools to artemis-cli and improving the tooling
Date Mon, 04 May 2015 15:30:38 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea3370b3/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
new file mode 100644
index 0000000..515323e
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.airlift.command.Arguments;
+import io.airlift.command.Command;
+import io.airlift.command.Option;
+import org.apache.activemq.artemis.cli.commands.Action;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.journal.impl.JournalRecord;
+import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.utils.Base64;
+
+@Command(name = "decode", description = "Decode a journal's internal format into a new journal
set of files")
+public class DecodeJournal implements Action
+{
+
+   @Option(name = "--directory", description = "The journal folder (default ../data/journal)")
+   public String directory = "../data/journal";
+
+   @Option(name = "--prefix", description = "The journal prefix (default activemq-datal)")
+   public String prefix = "activemq-data";
+
+   @Option(name = "--suffix", description = "The journal suffix (default amq)")
+   public String suffix = "amq";
+
+   @Option(name = "--file-size", description = "The journal size (default 10485760)")
+   public int size = 10485760;
+
+   @Arguments(description = "The input file name (default=exp.dmp)", required = true)
+   public String input;
+
+   public Object execute(ActionContext context) throws Exception
+   {
+      try
+      {
+         importJournal(directory, prefix, suffix, 2, size, input);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+
+      return null;
+   }
+
+
+
+   public static void importJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final String fileInput) throws Exception
+   {
+      FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
+      importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
+
+   }
+
+   public static void importJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final InputStream stream) throws Exception
+   {
+      Reader reader = new InputStreamReader(stream);
+      importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
+   }
+
+   public static void importJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final Reader reader) throws Exception
+   {
+
+      File journalDir = new File(directory);
+
+      if (!journalDir.exists())
+      {
+         if (!journalDir.mkdirs())
+            System.err.println("Could not create directory " + directory);
+      }
+
+      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null);
+
+      JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix,
journalSuffix, 1);
+
+      if (journal.orderFiles().size() != 0)
+      {
+         throw new IllegalStateException("Import needs to create a brand new journal");
+      }
+
+      journal.start();
+
+      // The journal is empty, as we checked already. Calling load just to initialize the
internal data
+      journal.loadInternalOnly();
+
+      BufferedReader buffReader = new BufferedReader(reader);
+
+      String line;
+
+      HashMap<Long, AtomicInteger> txCounters = new HashMap<Long, AtomicInteger>();
+
+      long lineNumber = 0;
+
+      Map<Long, JournalRecord> journalRecords = journal.getRecords();
+
+      while ((line = buffReader.readLine()) != null)
+      {
+         lineNumber++;
+         String[] splitLine = line.split(",");
+         if (splitLine[0].equals("#File"))
+         {
+            txCounters.clear();
+            continue;
+         }
+
+         Properties lineProperties = parseLine(splitLine);
+
+         String operation = null;
+         try
+         {
+            operation = lineProperties.getProperty("operation");
+
+            if (operation.equals("AddRecord"))
+            {
+               RecordInfo info = parseRecord(lineProperties);
+               journal.appendAddRecord(info.id, info.userRecordType, info.data, false);
+            }
+            else if (operation.equals("AddRecordTX"))
+            {
+               long txID = parseLong("txID", lineProperties);
+               AtomicInteger counter = getCounter(txID, txCounters);
+               counter.incrementAndGet();
+               RecordInfo info = parseRecord(lineProperties);
+               journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
+            }
+            else if (operation.equals("AddRecordTX"))
+            {
+               long txID = parseLong("txID", lineProperties);
+               AtomicInteger counter = getCounter(txID, txCounters);
+               counter.incrementAndGet();
+               RecordInfo info = parseRecord(lineProperties);
+               journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
+            }
+            else if (operation.equals("UpdateTX"))
+            {
+               long txID = parseLong("txID", lineProperties);
+               AtomicInteger counter = getCounter(txID, txCounters);
+               counter.incrementAndGet();
+               RecordInfo info = parseRecord(lineProperties);
+               journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType,
info.data);
+            }
+            else if (operation.equals("Update"))
+            {
+               RecordInfo info = parseRecord(lineProperties);
+               journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
+            }
+            else if (operation.equals("DeleteRecord"))
+            {
+               long id = parseLong("id", lineProperties);
+
+               // If not found it means the append/update records were reclaimed already
+               if (journalRecords.get(id) != null)
+               {
+                  journal.appendDeleteRecord(id, false);
+               }
+            }
+            else if (operation.equals("DeleteRecordTX"))
+            {
+               long txID = parseLong("txID", lineProperties);
+               long id = parseLong("id", lineProperties);
+               AtomicInteger counter = getCounter(txID, txCounters);
+               counter.incrementAndGet();
+
+               // If not found it means the append/update records were reclaimed already
+               if (journalRecords.get(id) != null)
+               {
+                  journal.appendDeleteRecordTransactional(txID, id);
+               }
+            }
+            else if (operation.equals("Prepare"))
+            {
+               long txID = parseLong("txID", lineProperties);
+               int numberOfRecords = parseInt("numberOfRecords", lineProperties);
+               AtomicInteger counter = getCounter(txID, txCounters);
+               byte[] data = parseEncoding("extraData", lineProperties);
+
+               if (counter.get() == numberOfRecords)
+               {
+                  journal.appendPrepareRecord(txID, data, false);
+               }
+               else
+               {
+                  System.err.println("Transaction " + txID +
+                                     " at line " +
+                                     lineNumber +
+                                     " is incomplete. The prepare record expected " +
+                                     numberOfRecords +
+                                     " while the import only had " +
+                                     counter);
+               }
+            }
+            else if (operation.equals("Commit"))
+            {
+               long txID = parseLong("txID", lineProperties);
+               int numberOfRecords = parseInt("numberOfRecords", lineProperties);
+               AtomicInteger counter = getCounter(txID, txCounters);
+               if (counter.get() == numberOfRecords)
+               {
+                  journal.appendCommitRecord(txID, false);
+               }
+               else
+               {
+                  System.err.println("Transaction " + txID +
+                                     " at line " +
+                                     lineNumber +
+                                     " is incomplete. The commit record expected " +
+                                     numberOfRecords +
+                                     " while the import only had " +
+                                     counter);
+               }
+            }
+            else if (operation.equals("Rollback"))
+            {
+               long txID = parseLong("txID", lineProperties);
+               journal.appendRollbackRecord(txID, false);
+            }
+            else
+            {
+               System.err.println("Invalid operation " + operation + " at line " + lineNumber);
+            }
+         }
+         catch (Exception ex)
+         {
+            System.err.println("Error at line " + lineNumber + ", operation=" + operation
+ " msg = " + ex.getMessage());
+         }
+      }
+
+      journal.stop();
+   }
+
+   protected static AtomicInteger getCounter(final Long txID, final Map<Long, AtomicInteger>
txCounters)
+   {
+
+      AtomicInteger counter = txCounters.get(txID);
+      if (counter == null)
+      {
+         counter = new AtomicInteger(0);
+         txCounters.put(txID, counter);
+      }
+
+      return counter;
+   }
+
+   protected static RecordInfo parseRecord(final Properties properties) throws Exception
+   {
+      long id = parseLong("id", properties);
+      byte userRecordType = parseByte("userRecordType", properties);
+      boolean isUpdate = parseBoolean("isUpdate", properties);
+      byte[] data = parseEncoding("data", properties);
+      return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
+   }
+
+   private static byte[] parseEncoding(final String name, final Properties properties) throws
Exception
+   {
+      String value = parseString(name, properties);
+
+      return decode(value);
+   }
+
+   /**
+    * @param properties
+    * @return
+    */
+   private static int parseInt(final String name, final Properties properties) throws Exception
+   {
+      String value = parseString(name, properties);
+
+      return Integer.parseInt(value);
+   }
+
+   private static long parseLong(final String name, final Properties properties) throws Exception
+   {
+      String value = parseString(name, properties);
+
+      return Long.parseLong(value);
+   }
+
+   private static boolean parseBoolean(final String name, final Properties properties) throws
Exception
+   {
+      String value = parseString(name, properties);
+
+      return Boolean.parseBoolean(value);
+   }
+
+   private static byte parseByte(final String name, final Properties properties) throws Exception
+   {
+      String value = parseString(name, properties);
+
+      return Byte.parseByte(value);
+   }
+
+   /**
+    * @param name
+    * @param properties
+    * @return
+    * @throws Exception
+    */
+   private static String parseString(final String name, final Properties properties) throws
Exception
+   {
+      String value = properties.getProperty(name);
+
+      if (value == null)
+      {
+         throw new Exception("property " + name + " not found");
+      }
+      return value;
+   }
+
+   protected static Properties parseLine(final String[] splitLine)
+   {
+      Properties properties = new Properties();
+
+      for (String el : splitLine)
+      {
+         String[] tuple = el.split("@");
+         if (tuple.length == 2)
+         {
+            properties.put(tuple[0], tuple[1]);
+         }
+         else
+         {
+            properties.put(tuple[0], tuple[0]);
+         }
+      }
+
+      return properties;
+   }
+
+   private static byte[] decode(final String data)
+   {
+      return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+   }
+
+
+   public void printUsage()
+   {
+      for (int i = 0; i < 10; i++)
+      {
+         System.err.println();
+      }
+      System.err.println("This method will export the journal at low level record.");
+      System.err.println();
+      System.err.println();
+      for (int i = 0; i < 10; i++)
+      {
+         System.err.println();
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea3370b3/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java
new file mode 100644
index 0000000..01f7ac5
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/EncodeJournal.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools;
+
+import java.io.BufferedOutputStream;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.List;
+
+import io.airlift.command.Command;
+import io.airlift.command.Option;
+import org.apache.activemq.artemis.cli.commands.Action;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.impl.JournalFile;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
+import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.utils.Base64;
+
+@Command(name = "encode", description = "Encode a set of journal files into an internal encoded
data format")
+public class EncodeJournal implements Action
+{
+
+   @Option(name = "--directory", description = "The journal folder (default ../data/journal)")
+   public String directory = "../data/journal";
+
+   @Option(name = "--prefix", description = "The journal prefix (default activemq-datal)")
+   public String prefix = "activemq-data";
+
+   @Option(name = "--suffix", description = "The journal suffix (default amq)")
+   public String suffix = "amq";
+
+   @Option(name = "--file-size", description = "The journal size (default 10485760)")
+   public int size = 10485760;
+
+
+   public Object execute(ActionContext context) throws Exception
+   {
+      try
+      {
+         exportJournal(directory, prefix, suffix, 2, size);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+
+      return null;
+   }
+
+
+   public static void exportJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize) throws Exception
+   {
+
+
+      exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, System.out);
+   }
+   public static void exportJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final String fileName) throws Exception
+   {
+      FileOutputStream fileOutputStream = new FileOutputStream(fileName);
+      BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
+      PrintStream out = new PrintStream(bufferedOutputStream);
+      try
+      {
+         exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
+      }
+      finally
+      {
+         out.close();
+         fileOutputStream.close();
+      }
+
+
+   }
+
+   public static void exportJournal(final String directory,
+                                    final String journalPrefix,
+                                    final String journalSuffix,
+                                    final int minFiles,
+                                    final int fileSize,
+                                    final PrintStream out) throws Exception
+   {
+      NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, null);
+
+      JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix,
journalSuffix, 1);
+
+      List<JournalFile> files = journal.orderFiles();
+
+      for (JournalFile file : files)
+      {
+         out.println("#File," + file);
+
+         exportJournalFile(out, nio, file);
+      }
+   }
+
+   /**
+    * @param out
+    * @param fileFactory
+    * @param file
+    * @throws Exception
+    */
+   public static void exportJournalFile(final PrintStream out,
+                                        final SequentialFileFactory fileFactory,
+                                        final JournalFile file) throws Exception
+   {
+      JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+      {
+
+         public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo)
throws Exception
+         {
+            out.println("operation@UpdateTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+         }
+
+         public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
+         {
+            out.println("operation@Update," + describeRecord(recordInfo));
+         }
+
+         public void onReadRollbackRecord(final long transactionID) throws Exception
+         {
+            out.println("operation@Rollback,txID@" + transactionID);
+         }
+
+         public void onReadPrepareRecord(final long transactionID, final byte[] extraData,
final int numberOfRecords) throws Exception
+         {
+            out.println("operation@Prepare,txID@" + transactionID +
+                           ",numberOfRecords@" +
+                           numberOfRecords +
+                           ",extraData@" +
+                           encode(extraData));
+         }
+
+         public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo)
throws Exception
+         {
+            out.println("operation@DeleteRecordTX,txID@" + transactionID +
+                           "," +
+                           describeRecord(recordInfo));
+         }
+
+         public void onReadDeleteRecord(final long recordID) throws Exception
+         {
+            out.println("operation@DeleteRecord,id@" + recordID);
+         }
+
+         public void onReadCommitRecord(final long transactionID, final int numberOfRecords)
throws Exception
+         {
+            out.println("operation@Commit,txID@" + transactionID + ",numberOfRecords@" +
numberOfRecords);
+         }
+
+         public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo)
throws Exception
+         {
+            out.println("operation@AddRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+         }
+
+         public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
+         {
+            out.println("operation@AddRecord," + describeRecord(recordInfo));
+         }
+
+         public void markAsDataFile(final JournalFile file)
+         {
+         }
+      });
+   }
+
+   private static String describeRecord(final RecordInfo recordInfo)
+   {
+      return "id@" + recordInfo.id +
+         ",userRecordType@" +
+         recordInfo.userRecordType +
+         ",length@" +
+         recordInfo.data.length +
+         ",isUpdate@" +
+         recordInfo.isUpdate +
+         ",compactCount@" +
+         recordInfo.compactCount +
+         ",data@" +
+         encode(recordInfo.data);
+   }
+
+   private static String encode(final byte[] data)
+   {
+      return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+   }
+
+
+   public void printUsage()
+   {
+      for (int i = 0; i < 10; i++)
+      {
+         System.err.println();
+      }
+      System.err.println("This method will export the journal at low level record.");
+      System.err.println();
+      System.err.println();
+      for (int i = 0; i < 10; i++)
+      {
+         System.err.println();
+      }
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea3370b3/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java
new file mode 100644
index 0000000..c739a01
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/HelpData.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.cli.commands.tools;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.airlift.command.Help;
+import org.apache.activemq.artemis.cli.commands.Action;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
+
+public class HelpData extends Help implements Action
+{
+
+   @Override
+   public Object execute(ActionContext context) throws Exception
+   {
+
+      List<String> commands = new ArrayList<>(1);
+      commands.add("data");
+      help(global, commands);
+      return null;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea3370b3/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
new file mode 100644
index 0000000..a5e18c0
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import io.airlift.command.Command;
+import io.airlift.command.Option;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.cli.commands.Action;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+
+@Command(name = "print", description = "Print data records information (WARNING: don't use
while a production server is running)")
+public class PrintData implements Action
+{
+   @Option(name = "--bindings", description = "The folder used for bindings (default ../data/bindings)")
+   public String binding = "../data/bindings";
+
+   @Option(name = "--journal", description = "The folder used for messages journal (default
../data/journal)")
+   public String journal = "../data/journal";
+
+   @Option(name = "--paging", description = "The folder used for paging (default ../data/paging)")
+   public String paging = "../data/paging";
+
+
+   @Override
+   public Object execute(ActionContext context) throws Exception
+   {
+      printData(binding, journal, paging);
+      return null;
+   }
+
+   public static void printData(String bindingsDirectory, String messagesDirectory, String
pagingDirectory)
+   {
+      File serverLockFile = new File(messagesDirectory, "server.lock");
+
+      if (serverLockFile.isFile())
+      {
+         try
+         {
+            FileLockNodeManager fileLock = new FileLockNodeManager(messagesDirectory, false);
+            fileLock.start();
+            System.out.println("********************************************");
+            System.out.println("Server's ID=" + fileLock.getNodeId().toString());
+            System.out.println("********************************************");
+            fileLock.stop();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+      }
+
+      System.out.println("********************************************");
+      System.out.println("B I N D I N G S  J O U R N A L");
+      System.out.println("********************************************");
+
+      try
+      {
+         DescribeJournal.describeBindingsJournal(bindingsDirectory);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+
+      System.out.println();
+      System.out.println("********************************************");
+      System.out.println("M E S S A G E S   J O U R N A L");
+      System.out.println("********************************************");
+
+      DescribeJournal describeJournal = null;
+      try
+      {
+         describeJournal = DescribeJournal.describeMessagesJournal(messagesDirectory);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         return;
+      }
+
+
+      try
+      {
+         System.out.println();
+         System.out.println("********************************************");
+         System.out.println("P A G I N G");
+         System.out.println("********************************************");
+
+         printPages(pagingDirectory, describeJournal);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         return;
+      }
+
+
+   }
+
+
+   private static void printPages(String pageDirectory, DescribeJournal describeJournal)
+   {
+      try
+      {
+
+         PageCursorsInfo cursorACKs = calculateCursorsInfo(describeJournal.getRecords());
+
+         Set<Long> pgTXs = cursorACKs.getPgTXs();
+
+         ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
+         final ExecutorService executor = Executors.newFixedThreadPool(10);
+         ExecutorFactory execfactory = new ExecutorFactory()
+         {
+            @Override
+            public Executor getExecutor()
+            {
+               return executor;
+            }
+         };
+         final StorageManager sm = new NullStorageManager();
+         PagingStoreFactory pageStoreFactory =
+            new PagingStoreFactoryNIO(sm, pageDirectory, 1000L, scheduled, execfactory, false,
null);
+         HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
+         addressSettingsRepository.setDefault(new AddressSettings());
+         PagingManager manager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
+
+         manager.start();
+
+         SimpleString[] stores = manager.getStoreNames();
+
+         for (SimpleString store : stores)
+         {
+            PagingStore pgStore = manager.getPageStore(store);
+            String folder = null;
+
+            if (pgStore != null)
+            {
+               folder = pgStore.getFolder();
+            }
+            System.out.println("####################################################################################################");
+            System.out.println("Exploring store " + store + " folder = " + folder);
+            int pgid = (int) pgStore.getFirstPage();
+            for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++)
+            {
+               System.out.println("*******   Page " + pgid);
+               Page page = pgStore.createPage(pgid);
+               page.open();
+               List<PagedMessage> msgs = page.read(sm);
+               page.close();
+
+               int msgID = 0;
+
+               for (PagedMessage msg : msgs)
+               {
+                  msg.initMessage(sm);
+                  System.out.print("pg=" + pgid + ", msg=" + msgID + ",pgTX=" + msg.getTransactionID()
+ ",userMessageID=" + (msg.getMessage().getUserID() != null ? msg.getMessage().getUserID()
: "") + ", msg=" + msg.getMessage());
+                  System.out.print(",Queues = ");
+                  long[] q = msg.getQueueIDs();
+                  for (int i = 0; i < q.length; i++)
+                  {
+                     System.out.print(q[i]);
+
+                     PagePosition posCheck = new PagePositionImpl(pgid, msgID);
+
+                     boolean acked = false;
+
+                     Set<PagePosition> positions = cursorACKs.getCursorRecords().get(q[i]);
+                     if (positions != null)
+                     {
+                        acked = positions.contains(posCheck);
+                     }
+
+                     if (acked)
+                     {
+                        System.out.print(" (ACK)");
+                     }
+
+                     if (cursorACKs.getCompletePages(q[i]).contains(Long.valueOf(pgid)))
+                     {
+                        System.out.println(" (PG-COMPLETE)");
+                     }
+
+
+                     if (i + 1 < q.length)
+                     {
+                        System.out.print(",");
+                     }
+                  }
+                  if (msg.getTransactionID() >= 0 && !pgTXs.contains(msg.getTransactionID()))
+                  {
+                     System.out.print(", **PG_TX_NOT_FOUND**");
+                  }
+                  System.out.println();
+                  msgID++;
+               }
+
+               pgid++;
+
+            }
+         }
+
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+
+   /** Calculate the acks on the page system */
+   protected static PageCursorsInfo calculateCursorsInfo(List<RecordInfo> records)
throws Exception
+   {
+
+      PageCursorsInfo cursorInfo = new PageCursorsInfo();
+
+
+      for (RecordInfo record : records)
+      {
+         byte[] data = record.data;
+
+         ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
+
+         if (record.userRecordType == JournalRecordIds.ACKNOWLEDGE_CURSOR)
+         {
+            JournalStorageManager.CursorAckRecordEncoding encoding = new JournalStorageManager.CursorAckRecordEncoding();
+            encoding.decode(buff);
+
+            Set<PagePosition> set = cursorInfo.getCursorRecords().get(encoding.queueID);
+
+            if (set == null)
+            {
+               set = new HashSet<PagePosition>();
+               cursorInfo.getCursorRecords().put(encoding.queueID, set);
+            }
+
+            set.add(encoding.position);
+         }
+         else if (record.userRecordType == JournalRecordIds.PAGE_CURSOR_COMPLETE)
+         {
+            JournalStorageManager.CursorAckRecordEncoding encoding = new JournalStorageManager.CursorAckRecordEncoding();
+            encoding.decode(buff);
+
+            Long queueID = Long.valueOf(encoding.queueID);
+            Long pageNR = Long.valueOf(encoding.position.getPageNr());
+
+            if (!cursorInfo.getCompletePages(queueID).add(pageNR))
+            {
+               System.err.println("Page " + pageNR + " has been already set as complete on
queue " + queueID);
+            }
+         }
+         else if (record.userRecordType == JournalRecordIds.PAGE_TRANSACTION)
+         {
+            if (record.isUpdate)
+            {
+               JournalStorageManager.PageUpdateTXEncoding pageUpdate = new JournalStorageManager.PageUpdateTXEncoding();
+
+               pageUpdate.decode(buff);
+               cursorInfo.getPgTXs().add(pageUpdate.pageTX);
+            }
+            else
+            {
+               PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+
+               pageTransactionInfo.decode(buff);
+
+               pageTransactionInfo.setRecordID(record.id);
+               cursorInfo.getPgTXs().add(pageTransactionInfo.getTransactionID());
+            }
+         }
+      }
+
+      return cursorInfo;
+   }
+
+
+   private static class PageCursorsInfo
+   {
+      private final Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long,
Set<PagePosition>>();
+
+      private final Set<Long> pgTXs = new HashSet<Long>();
+
+      private final Map<Long, Set<Long>> completePages = new HashMap<Long,
Set<Long>>();
+
+      public PageCursorsInfo()
+      {
+      }
+
+
+      /**
+       * @return the pgTXs
+       */
+      public Set<Long> getPgTXs()
+      {
+         return pgTXs;
+      }
+
+
+      /**
+       * @return the cursorRecords
+       */
+      public Map<Long, Set<PagePosition>> getCursorRecords()
+      {
+         return cursorRecords;
+      }
+
+
+      /**
+       * @return the completePages
+       */
+      public Map<Long, Set<Long>> getCompletePages()
+      {
+         return completePages;
+      }
+
+      public Set<Long> getCompletePages(Long queueID)
+      {
+         Set<Long> completePagesSet = completePages.get(queueID);
+
+         if (completePagesSet == null)
+         {
+            completePagesSet = new HashSet<Long>();
+            completePages.put(queueID, completePagesSet);
+         }
+
+         return completePagesSet;
+      }
+
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea3370b3/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java
new file mode 100644
index 0000000..2923760
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataConstants.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools;
+
+/**
+ * The constants shared by <code>org.apache.activemq.tools.XmlDataImporter</code>
and
+ * <code>org.apache.activemq.tools.XmlDataExporter</code>.
+ */
+public final class XmlDataConstants
+{
+   private XmlDataConstants()
+   {
+      // Utility
+   }
+   static final String XML_VERSION = "1.0";
+   static final String DOCUMENT_PARENT = "activemq-journal";
+   static final String BINDINGS_PARENT = "bindings";
+   static final String BINDINGS_CHILD = "binding";
+   static final String BINDING_ADDRESS = "address";
+   static final String BINDING_FILTER_STRING = "filter-string";
+   static final String BINDING_QUEUE_NAME = "queue-name";
+   static final String BINDING_ID = "id";
+   static final String JMS_CONNECTION_FACTORY = "jms-connection-factory";
+   static final String JMS_CONNECTION_FACTORIES = "jms-connection-factories";
+   static final String MESSAGES_PARENT = "messages";
+   static final String MESSAGES_CHILD = "message";
+   static final String MESSAGE_ID = "id";
+   static final String MESSAGE_PRIORITY = "priority";
+   static final String MESSAGE_EXPIRATION = "expiration";
+   static final String MESSAGE_TIMESTAMP = "timestamp";
+   static final String DEFAULT_TYPE_PRETTY = "default";
+   static final String BYTES_TYPE_PRETTY = "bytes";
+   static final String MAP_TYPE_PRETTY = "map";
+   static final String OBJECT_TYPE_PRETTY = "object";
+   static final String STREAM_TYPE_PRETTY = "stream";
+   static final String TEXT_TYPE_PRETTY = "text";
+   static final String MESSAGE_TYPE = "type";
+   static final String MESSAGE_IS_LARGE = "isLarge";
+   static final String MESSAGE_USER_ID = "user-id";
+   static final String MESSAGE_BODY = "body";
+   static final String PROPERTIES_PARENT = "properties";
+   static final String PROPERTIES_CHILD = "property";
+   static final String PROPERTY_NAME = "name";
+   static final String PROPERTY_VALUE = "value";
+   static final String PROPERTY_TYPE = "type";
+   static final String QUEUES_PARENT = "queues";
+   static final String QUEUES_CHILD = "queue";
+   static final String QUEUE_NAME = "name";
+   static final String PROPERTY_TYPE_BOOLEAN = "boolean";
+   static final String PROPERTY_TYPE_BYTE = "byte";
+   static final String PROPERTY_TYPE_BYTES = "bytes";
+   static final String PROPERTY_TYPE_SHORT = "short";
+   static final String PROPERTY_TYPE_INTEGER = "integer";
+   static final String PROPERTY_TYPE_LONG = "long";
+   static final String PROPERTY_TYPE_FLOAT = "float";
+   static final String PROPERTY_TYPE_DOUBLE = "double";
+   static final String PROPERTY_TYPE_STRING = "string";
+   static final String PROPERTY_TYPE_SIMPLE_STRING = "simple-string";
+
+   static final String JMS_CONNECTION_FACTORY_NAME = "name";
+   static final String JMS_CONNECTION_FACTORY_CLIENT_ID = "client-id";
+   static final String JMS_CONNECTION_FACTORY_CALL_FAILOVER_TIMEOUT = "call-failover-timeout";
+   static final String JMS_CONNECTION_FACTORY_CALL_TIMEOUT = "call-timeout";
+   static final String JMS_CONNECTION_FACTORY_CLIENT_FAILURE_CHECK_PERIOD = "client-failure-check-period";
+   static final String JMS_CONNECTION_FACTORY_CONFIRMATION_WINDOW_SIZE = "confirmation-window-size";
+   static final String JMS_CONNECTION_FACTORY_CONNECTION_TTL = "connection-ttl";
+   static final String JMS_CONNECTION_FACTORY_CONSUMER_MAX_RATE = "consumer-max-rate";
+   static final String JMS_CONNECTION_FACTORY_CONSUMER_WINDOW_SIZE = "consumer-window-size";
+   static final String JMS_CONNECTION_FACTORY_DISCOVERY_GROUP_NAME = "discovery-group-name";
+   static final String JMS_CONNECTION_FACTORY_DUPS_OK_BATCH_SIZE = "dups-ok-batch-size";
+   static final String JMS_CONNECTION_FACTORY_TYPE = "type";
+   static final String JMS_CONNECTION_FACTORY_GROUP_ID = "group-id";
+   static final String JMS_CONNECTION_FACTORY_LOAD_BALANCING_POLICY_CLASS_NAME = "load-balancing-policy-class-name";
+   static final String JMS_CONNECTION_FACTORY_MAX_RETRY_INTERVAL = "max-retry-interval";
+   static final String JMS_CONNECTION_FACTORY_MIN_LARGE_MESSAGE_SIZE = "min-large-message-size";
+   static final String JMS_CONNECTION_FACTORY_PRODUCER_MAX_RATE = "producer-max-rate";
+   static final String JMS_CONNECTION_FACTORY_PRODUCER_WINDOW_SIZE = "producer-window-size";
+   static final String JMS_CONNECTION_FACTORY_RECONNECT_ATTEMPTS = "reconnect-attempts";
+   static final String JMS_CONNECTION_FACTORY_RETRY_INTERVAL = "retry-interval";
+   static final String JMS_CONNECTION_FACTORY_RETRY_INTERVAL_MULTIPLIER = "retry-interval-multiplier";
+   static final String JMS_CONNECTION_FACTORY_SCHEDULED_THREAD_POOL_MAX_SIZE = "scheduled-thread-pool-max-size";
+   static final String JMS_CONNECTION_FACTORY_THREAD_POOL_MAX_SIZE = "thread-pool-max-size";
+   static final String JMS_CONNECTION_FACTORY_TRANSACTION_BATCH_SIZE = "transaction-batch-size";
+   static final String JMS_CONNECTION_FACTORY_CONNECTORS = "connectors";
+   static final String JMS_CONNECTION_FACTORY_CONNECTOR = "connector";
+   static final String JMS_CONNECTION_FACTORY_AUTO_GROUP = "auto-group";
+   static final String JMS_CONNECTION_FACTORY_BLOCK_ON_ACKNOWLEDGE = "block-on-acknowledge";
+   static final String JMS_CONNECTION_FACTORY_BLOCK_ON_DURABLE_SEND = "block-on-durable-send";
+   static final String JMS_CONNECTION_FACTORY_BLOCK_ON_NON_DURABLE_SEND = "block-on-non-durable-send";
+   static final String JMS_CONNECTION_FACTORY_CACHE_LARGE_MESSAGES_CLIENT = "cache-large-messages-client";
+   static final String JMS_CONNECTION_FACTORY_COMPRESS_LARGE_MESSAGES = "compress-large-messages";
+   static final String JMS_CONNECTION_FACTORY_FAILOVER_ON_INITIAL_CONNECTION = "failover-on-initial-connection";
+   static final String JMS_CONNECTION_FACTORY_HA = "ha";
+   static final String JMS_CONNECTION_FACTORY_PREACKNOWLEDGE = "preacknowledge";
+   static final String JMS_CONNECTION_FACTORY_USE_GLOBAL_POOLS = "use-global-pools";
+
+   static final String JMS_DESTINATIONS = "jms-destinations";
+   static final String JMS_DESTINATION = "jms-destination";
+   static final String JMS_DESTINATION_NAME = "name";
+   static final String JMS_DESTINATION_SELECTOR = "selector";
+   static final String JMS_DESTINATION_TYPE = "type";
+
+   static final String JMS_JNDI_ENTRIES = "entries";
+   static final String JMS_JNDI_ENTRY = "entry";
+
+   public static final String JNDI_COMPATIBILITY_PREFIX = "java:jboss/exported/";
+
+   static final String NULL = "_AMQ_NULL";
+}
\ No newline at end of file


Mime
View raw message