activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq-cli-tools git commit: https://issues.apache.org/jira/browse/AMQCLI-4
Date Mon, 13 Feb 2017 18:23:02 GMT
Repository: activemq-cli-tools
Updated Branches:
  refs/heads/master 8d58305d3 -> 3fee126f1


https://issues.apache.org/jira/browse/AMQCLI-4

Initial implementation of a Queue export functionality for KahaDB to
Artemis XML.  This still needs some work


Project: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/commit/3fee126f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/tree/3fee126f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/diff/3fee126f

Branch: refs/heads/master
Commit: 3fee126f1b84fb75e8af39198291d229f469b311
Parents: 8d58305
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Wed Feb 8 16:02:21 2017 -0500
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Mon Feb 13 13:22:56 2017 -0500

----------------------------------------------------------------------
 activemq-kahadb-exporter/pom.xml                |  24 ++
 .../schema/ArtemisJournalMarshaller.java        |  85 +++++-
 .../cli/kahadb/exporter/KahaDBExporter.java     |  91 +++++++
 .../kahadb/exporter/MessageStoreExporter.java   |  26 ++
 .../exporter/OpenWireExportConverter.java       |  28 ++
 .../ArtemisXmlMessageRecoveryListener.java      |  75 ++++++
 .../artemis/OpenWireMessageTypeConverter.java   | 140 ++++++++++
 .../main/resources/artemis-import-export.xjb    |   5 +-
 .../cli/kahadb/exporter/ExporterTest.java       | 269 ++++++++++++++++++-
 .../src/test/resources/log4j.properties         |   4 +-
 pom.xml                                         |  22 ++
 11 files changed, 763 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/pom.xml b/activemq-kahadb-exporter/pom.xml
index 1247661..9bcbb72 100644
--- a/activemq-kahadb-exporter/pom.xml
+++ b/activemq-kahadb-exporter/pom.xml
@@ -34,6 +34,14 @@
       <artifactId>activemq-kahadb-store</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>artemis-cli</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>artemis-openwire-protocol</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -43,6 +51,21 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
@@ -120,6 +143,7 @@
               <extensionArgs>
                 <extensionArg>-Xfluent-builder</extensionArg>
               </extensionArgs>
+              <extension>true</extension>
             </xsdOption>
           </xsdOptions>
         </configuration>

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java
index 82e167b..f1695e5 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java
@@ -16,6 +16,11 @@
  */
 package org.apache.activemq.cli.artemis.schema;
 
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.regex.Pattern;
+
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBElement;
 import javax.xml.bind.JAXBException;
@@ -53,7 +58,9 @@ public class ArtemisJournalMarshaller {
         this.context = JAXBContext.newInstance(ObjectFactory.class);
         this.marshaller = context.createMarshaller();
         this.marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
-        this.xmlWriter = xmlWriter;
+
+        PrettyPrintHandler handler = new PrettyPrintHandler(xmlWriter);
+        this.xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(),
new Class[]{XMLStreamWriter.class}, handler);
     }
 
     public void appendJournalOpen() throws XMLStreamException {
@@ -97,4 +104,80 @@ public class ArtemisJournalMarshaller {
     private <T> JAXBElement<T> wrap(String name, T object) {
         return new JAXBElement<T>(QName.valueOf(name), (Class<T>) object.getClass(),
object);
     }
+
+    static class PrettyPrintHandler implements InvocationHandler {
+
+        private static final Pattern XML_CHARS = Pattern.compile( "[&<>]" );
+
+        private final XMLStreamWriter target;
+
+        private int depth = 0;
+
+        private static final char INDENT_CHAR = ' ';
+
+        private static final String LINE_SEPARATOR = System.getProperty("line.separator");
+
+        boolean wrap = true;
+
+        PrettyPrintHandler(XMLStreamWriter target) {
+           this.target = target;
+        }
+
+        @Override
+        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
+           String m = method.getName();
+           boolean useCData = false;
+
+           switch (m) {
+              case "writeStartElement":
+                 target.writeCharacters(LINE_SEPARATOR);
+                 target.writeCharacters(indent(depth));
+
+                 depth++;
+                 break;
+              case "writeEndElement":
+                 depth--;
+                 if (wrap) {
+                    target.writeCharacters(LINE_SEPARATOR);
+                    target.writeCharacters(indent(depth));
+                 }
+                 wrap = true;
+                 break;
+              case "writeEmptyElement":
+              case "writeCData":
+                 target.writeCharacters(LINE_SEPARATOR);
+                 target.writeCharacters(indent(depth));
+                 break;
+              case "writeCharacters":
+                 useCData = XML_CHARS.matcher( (String)args[0] ).find();
+                 if (!useCData) {
+                     wrap = false;
+                     break;
+                 } else {
+                     target.writeCharacters(LINE_SEPARATOR);
+                     target.writeCharacters(indent(depth));
+                     break;
+                 }
+           }
+
+           if (useCData) {
+               Method cdata = XMLStreamWriter.class.getMethod("writeCData", String.class);
+               args[0] = ((String)args[0]).replace("<![CDATA[", "").replace("]]>",
"");
+               cdata.invoke(target, args);
+           } else {
+               method.invoke(target, args);
+           }
+
+           return null;
+        }
+
+        private String indent(int depth) {
+           depth *= 3; // level of indentation
+           char[] output = new char[depth];
+           while (depth-- > 0) {
+              output[depth] = INDENT_CHAR;
+           }
+           return new String(output);
+        }
+     }
 }

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
new file mode 100644
index 0000000..c178a8c
--- /dev/null
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.IOExceptionSupport;
+
+public class KahaDBExporter implements MessageStoreExporter {
+
+    private final KahaDBPersistenceAdapter adapter;
+    private final MessageRecoveryListener recoveryListener;
+
+    public KahaDBExporter (final KahaDBPersistenceAdapter adapter,
+            final MessageRecoveryListener recoveryListener) {
+        this.adapter = adapter;
+        this.recoveryListener = recoveryListener;
+    }
+
+    @Override
+    public void exportQueues() throws IOException {
+
+        final Set<ActiveMQDestination> destinations = adapter.getDestinations().stream().filter(
+                dest -> dest.isQueue()).collect(Collectors.toSet());
+
+        // loop through all queues and export them
+        for (final ActiveMQDestination destination : destinations) {
+
+            final ActiveMQQueue queue = (ActiveMQQueue) destination;
+            final MessageStore messageStore = adapter.createQueueMessageStore(queue);
+
+            try {
+                // migrate the data
+                messageStore.recover(recoveryListener);
+            } catch (Exception e) {
+                IOExceptionSupport.create(e);
+            }
+        }
+    }
+
+    @Override
+    public void exportTopics() throws IOException {
+
+        final Set<ActiveMQDestination> destinations = adapter.getDestinations().stream().filter(
+                dest -> dest.isTopic()).collect(Collectors.toSet());
+
+        for (ActiveMQDestination destination : destinations) {
+            final ActiveMQTopic topic = (ActiveMQTopic) destination;
+            final TopicMessageStore messageStore = adapter.createTopicMessageStore(topic);
+
+            //recover subscriptions
+            //TODO: This will most likely run into the same message more than once if there
is
+            //more than one durable sub on a topic so we should look at optimizing this
+            //Ideally we'd just recover all the messages once and then ask KahaDB which subscriptions
+            //have not acked the message.  This will probably require a new hook into KahaDB
+//            for (final SubscriptionInfo subscriptionInfo : messageStore.getAllSubscriptions())
{
+//
+//                try {
+//                    messageStore.recoverSubscription(subscriptionInfo.getClientId(),
+//                            subscriptionInfo.getSubscriptionName(), recoveryListener);
+//                } catch (Exception e) {
+//                    IOExceptionSupport.create(e);
+//                }
+//            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java
new file mode 100644
index 0000000..b228e19
--- /dev/null
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter;
+
+import java.io.IOException;
+
+public interface MessageStoreExporter {
+
+    public void exportQueues() throws IOException;
+
+    public void exportTopics() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/OpenWireExportConverter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/OpenWireExportConverter.java
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/OpenWireExportConverter.java
new file mode 100644
index 0000000..f364c20
--- /dev/null
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/OpenWireExportConverter.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter;
+
+import org.apache.activemq.command.Message;
+
+/**
+ * Convert an OpenWire message to another format
+ */
+public interface OpenWireExportConverter <T> {
+
+    public T convert (final Message message) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
new file mode 100644
index 0000000..c2d04a2
--- /dev/null
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter.artemis;
+
+import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
+import org.apache.activemq.cli.schema.MessageType;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Recovery listener that can be used to export messages to Artemis
+ */
+public class ArtemisXmlMessageRecoveryListener implements MessageRecoveryListener {
+    static final Logger LOG = LoggerFactory.getLogger(ArtemisXmlMessageRecoveryListener.class);
+
+    private final ArtemisJournalMarshaller xmlMarshaller;
+    private final OpenWireMessageTypeConverter converter = new OpenWireMessageTypeConverter();
+
+
+    /**
+     * @param file
+     */
+    public ArtemisXmlMessageRecoveryListener(final ArtemisJournalMarshaller xmlMarshaller)
{
+        super();
+        this.xmlMarshaller = xmlMarshaller;
+    }
+
+
+    @Override
+    public boolean recoverMessage(Message message) throws Exception {
+        try {
+            MessageType messageType = converter.convert(message);
+            xmlMarshaller.appendMessage(messageType);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            return false;
+        }
+        return true;
+    }
+
+
+    @Override
+    public boolean recoverMessageReference(MessageId ref) throws Exception {
+        return false;
+    }
+
+    @Override
+    public boolean hasSpace() {
+        return true;
+    }
+
+
+    @Override
+    public boolean isDuplicate(MessageId ref) {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
new file mode 100644
index 0000000..8d24e96
--- /dev/null
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter.artemis;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.utils.Base64;
+import org.apache.activemq.cli.kahadb.exporter.OpenWireExportConverter;
+import org.apache.activemq.cli.schema.BodyType;
+import org.apache.activemq.cli.schema.MessageType;
+import org.apache.activemq.cli.schema.PropertiesType;
+import org.apache.activemq.cli.schema.PropertyType;
+import org.apache.activemq.cli.schema.QueueType;
+import org.apache.activemq.cli.schema.QueuesType;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.openwire.OpenWireFormat;
+
+public class OpenWireMessageTypeConverter implements OpenWireExportConverter<MessageType>
{
+
+    static final String MESSAGE_TIMESTAMP = "timestamp";
+    static final String DEFAULT_TYPE_PRETTY = "default";
+    static final String BYTES_TYPE_PRETTY = "bytes";
+    static final String MAP_TYPE_PRETTY = "map";
+    static final String OBJECT_TYPE_PRETTY = "object";
+    static final String STREAM_TYPE_PRETTY = "stream";
+    static final String TEXT_TYPE_PRETTY = "text";
+
+    final OpenWireMessageConverter converter = new OpenWireMessageConverter(new OpenWireFormat());
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.cli.kahadb.exporter.MessageConverter#convert(org.apache.activemq.Message)
+     */
+    @Override
+    public MessageType convert(final Message message) throws Exception {
+        final ServerMessage serverMessage = converter.inbound(message);
+        final MessageType messageType = convertAttributes(serverMessage);
+
+        try {
+            if (!message.getProperties().isEmpty()) {
+                final PropertiesType propertiesType = new PropertiesType();
+                serverMessage.getPropertyNames().forEach(key -> {
+                    Object value = serverMessage.getObjectProperty(key);
+                    propertiesType.getProperty().add(PropertyType.builder()
+                            .withName(key.toString())
+                            .withValueAttribute(convertPropertyValue(value))
+                            .withType(convertPropertyType(value.getClass()))
+                            .build());
+                });
+                messageType.setProperties(propertiesType);
+            }
+
+            messageType.setQueues(convertQueue(message));
+            messageType.setBody(convertBody(serverMessage));
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+
+        return messageType;
+    }
+
+    private QueuesType convertQueue(final Message message) throws JMSException {
+
+        return QueuesType.builder()
+                .withQueue(QueueType.builder()
+                        .withName(message.getDestination().getPhysicalName()).build())
+            .build();
+    }
+
+    private BodyType convertBody(final ServerMessage serverMessage) throws Exception {
+        int size = serverMessage.getEndOfBodyPosition() - serverMessage.getBodyBuffer().readerIndex();
+        byte[] buffer = new byte[size];
+        serverMessage.getBodyBuffer().readBytes(buffer);
+        String value = encode(buffer);
+
+        //requires CDATA
+        return BodyType.builder()
+            .withValue("<![CDATA[" + value + "]]>")
+            .build();
+    }
+
+    private String convertPropertyValue(Object value) {
+        if (value instanceof byte[]) {
+            return encode((byte[]) value).toString();
+        }
+        return value.toString();
+    }
+
+    private MessageType convertAttributes(final ServerMessage message) {
+        MessageType messageType = MessageType.builder()
+                .withId(message.getMessageID())
+                .withTimestamp(message.getTimestamp())
+                .withPriority(message.getPriority()).build();
+
+        byte rawType = message.getType();
+        String prettyType = DEFAULT_TYPE_PRETTY;
+        if (rawType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
+           prettyType = BYTES_TYPE_PRETTY;
+        } else if (rawType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
+           prettyType = MAP_TYPE_PRETTY;
+        } else if (rawType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
+           prettyType = OBJECT_TYPE_PRETTY;
+        } else if (rawType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
+           prettyType = STREAM_TYPE_PRETTY;
+        } else if (rawType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
+           prettyType = TEXT_TYPE_PRETTY;
+        }
+
+        messageType.setType(prettyType);
+        return messageType;
+    }
+
+    private String convertPropertyType(Class<?> clazz) {
+        if (clazz.equals(SimpleString.class)) {
+            return String.class.getSimpleName().toLowerCase();
+        }
+        return clazz.getSimpleName().toLowerCase();
+    }
+
+    private static String encode(final byte[] data) {
+        return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+     }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/main/resources/artemis-import-export.xjb
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/src/main/resources/artemis-import-export.xjb b/activemq-kahadb-exporter/src/main/resources/artemis-import-export.xjb
index 3f2eea3..ce7670e 100644
--- a/activemq-kahadb-exporter/src/main/resources/artemis-import-export.xjb
+++ b/activemq-kahadb-exporter/src/main/resources/artemis-import-export.xjb
@@ -17,8 +17,8 @@
 -->
 <bindings xmlns="http://java.sun.com/xml/ns/jaxb" xmlns:xsi="http://www.w3.org/2000/10/XMLSchema-instance"
   xmlns:xs="http://www.w3.org/2001/XMLSchema"
-  extensionBindingPrefixes="xjc annox"
-  xmlns:annox="http://annox.dev.java.net" version="2.1">
+  xmlns:xjc="http://java.sun.com/xml/ns/jaxb/xjc"
+  extensionBindingPrefixes="xjc" version="2.1">
   <bindings schemaLocation="artemis-import-export.xsd" version="1.0">
 
     <!-- Customize the package name -->
@@ -32,5 +32,6 @@
         <property name="valueAttribute" />
       </bindings>
     </bindings>
+    
   </bindings>
 </bindings>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
index df55a20..84edb33 100644
--- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
@@ -16,11 +16,278 @@
  */
 package org.apache.activemq.cli.kahadb.exporter;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamWriter;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
+import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener;
+import org.apache.activemq.cli.schema.ActivemqJournalType;
+import org.apache.activemq.cli.schema.ObjectFactory;
+import org.apache.activemq.cli.schema.QueueBindingType;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IdGenerator;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ExporterTest {
 
+    static final Logger LOG = LoggerFactory.getLogger(ExporterTest.class);
+
+    @Rule
+    public TemporaryFolder storeFolder = new TemporaryFolder();
+
+    /**
+     * TODO Improve test when real exporting is done, for now this just
+     * tests that the recovery listener iterates over all the queue messages
+     *
+     * @throws Exception
+     */
     @Test
-    public void test() {
+    public void testExportQueues() throws Exception {
+
+        ActiveMQQueue queue = new ActiveMQQueue("test.queue");
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setJournalMaxFileLength(1024 * 1024);
+        adapter.setDirectory(storeFolder.newFolder());
+        adapter.start();
+        MessageStore messageStore = adapter.createQueueMessageStore(queue);
+        messageStore.start();
+
+        IdGenerator id = new IdGenerator();
+        ConnectionContext context = new ConnectionContext();
+        for (int i = 0; i < 5; i++) {
+            ActiveMQTextMessage message = new ActiveMQTextMessage();
+            message.setText("Test");
+            message.setProperty("MyStringProperty", "abc");
+            message.setProperty("MyIntegerProperty", 1);
+            message.setDestination(queue);
+            message.setMessageId(new MessageId(id.generateId() + ":1", i));
+            messageStore.addMessage(context, message);
+        }
+        byte[] bytes = new byte[] {10, 11, 12};
+        for (int i = 0; i < 3; i++) {
+            ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+
+            message.setContent(new ByteSequence(bytes));
+            message.setProperty("MyStringProperty", "abc");
+            message.setProperty("MyByteProperty", (byte)10);
+            message.setDestination(queue);
+            message.setMessageId(new MessageId(id.generateId() + ":2", i));
+            messageStore.addMessage(context, message);
+        }
+
+        for (int i = 0; i < 3; i++) {
+            ActiveMQMapMessage message = new ActiveMQMapMessage();
+            message.setObject("key", "value");
+            message.setObject("key2", 10);
+            message.setProperty("MyStringProperty", "abc");
+            message.setDestination(queue);
+            message.setMessageId(new MessageId(id.generateId() + ":3", i));
+            messageStore.addMessage(context, message);
+        }
+
+        Date date = new Date();
+        for (int i = 0; i < 3; i++) {
+            ActiveMQObjectMessage message = new ActiveMQObjectMessage();
+            message.setObject(date);
+            message.setDestination(queue);
+            message.setMessageId(new MessageId(id.generateId() + ":4", i));
+            messageStore.addMessage(context, message);
+        }
+
+        for (int i = 0; i < 3; i++) {
+            ActiveMQStreamMessage message = new ActiveMQStreamMessage();
+            message.writeByte((byte)10);
+            message.storeContentAndClear();
+            message.setDestination(queue);
+            message.setMessageId(new MessageId(id.generateId() + ":5", i));
+            messageStore.addMessage(context, message);
+        }
+
+        messageStore.stop();
+
+        File file = storeFolder.newFile();
+        try(FileOutputStream fos = new FileOutputStream(file)) {
+            XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
+            ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter);
+
+            xmlMarshaller.appendJournalOpen();
+            xmlMarshaller.appendBindingsElement();
+            xmlMarshaller.appendBinding(QueueBindingType.builder()
+                    .withName("test.queue")
+                    .withAddress("test.queue").build());
+            xmlMarshaller.appendEndElement();
+            xmlMarshaller.appendMessagesElement();
+
+            KahaDBExporter dbExporter = new KahaDBExporter(adapter,
+                    new ArtemisXmlMessageRecoveryListener(xmlMarshaller));
+
+            dbExporter.exportQueues();
+            xmlMarshaller.appendJournalClose(true);
+        }
+
+
+        try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+            String line = null;
+            while ((line = br.readLine()) != null) {
+                System.out.println(line);
+            }
+         }
+
+
+        validate(file, 17);
+
+        final ActiveMQServer artemisServer = buildArtemisBroker();
+        artemisServer.start();
+
+        XmlDataImporter dataImporter = new XmlDataImporter();
+        dataImporter.process(file.getAbsolutePath(), "localhost", 61400, false);
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400");
+
+        Connection connection = null;
+        try {
+
+            connection = cf.createConnection();
+            connection.start();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createConsumer(session.createQueue("test.queue"));
+
+            for (int i = 0; i < 5; i++) {
+                TextMessage messageReceived = (TextMessage) messageConsumer.receive(1000);
+                assertNotNull(messageReceived);
+                assertEquals("abc", messageReceived.getStringProperty("MyStringProperty"));
+                assertEquals("Test", messageReceived.getText());
+            }
+
+            for (int i = 0; i < 3; i++) {
+                BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(1000);
+                assertNotNull(messageReceived);
+                assertEquals("abc", messageReceived.getStringProperty("MyStringProperty"));
+                assertEquals((byte)10, messageReceived.getByteProperty("MyByteProperty"));
+                byte[] result = new byte[3];
+                messageReceived.readBytes(result);
+                assertArrayEquals(bytes, result);
+            }
+
+            for (int i = 0; i < 3; i++) {
+                MapMessage messageReceived = (MapMessage) messageConsumer.receive(1000);
+                assertNotNull(messageReceived);
+                assertEquals("abc", messageReceived.getStringProperty("MyStringProperty"));
+                assertEquals("value", messageReceived.getObject("key"));
+            }
+
+            for (int i = 0; i < 3; i++) {
+                ObjectMessage messageReceived = (ObjectMessage) messageConsumer.receive(1000);
+                assertNotNull(messageReceived);
+                assertEquals(date, messageReceived.getObject());
+            }
+
+            for (int i = 0; i < 3; i++) {
+                StreamMessage messageReceived = (StreamMessage) messageConsumer.receive(1000);
+                assertNotNull(messageReceived);
+                assertEquals((byte)10, messageReceived.readByte());
+            }
+
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+            cf.close();
+        }
+
+        artemisServer.stop();
     }
+
+    public ActiveMQServer buildArtemisBroker() throws IOException {
+        Configuration configuration = new ConfigurationImpl();
+
+        configuration.setPersistenceEnabled(true);
+        configuration.setSecurityEnabled(false);
+
+        Map<String, Object> connectionParams = new HashMap<String, Object>();
+        connectionParams.put(
+                org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
61400);
+
+        configuration.setBindingsDirectory(storeFolder.newFolder().getAbsolutePath());
+        configuration.setJournalDirectory(storeFolder.newFolder().getAbsolutePath());
+        configuration.setLargeMessagesDirectory(storeFolder.newFolder().getAbsolutePath());
+        configuration.setPagingDirectory(storeFolder.newFolder().getAbsolutePath());
+
+        configuration.addAcceptorConfiguration(
+                new TransportConfiguration(NettyAcceptorFactory.class.getName(), connectionParams));
+        configuration.addConnectorConfiguration("connector",
+                new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
+
+        configuration.addAddressConfiguration(new CoreAddressConfiguration()
+                .setName("test.queue")
+                .addRoutingType(RoutingType. ANYCAST)
+                .addQueueConfiguration(new CoreQueueConfiguration()
+                        .setAddress("test.queue")
+                        .setName("test.queue")
+                        .setRoutingType(RoutingType.ANYCAST))
+                );
+
+       return new ActiveMQServerImpl(configuration);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void validate(File file, int count) throws JAXBException {
+        JAXBContext jaxbContext = JAXBContext.newInstance(ObjectFactory.class);
+        Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
+        JAXBElement<ActivemqJournalType> read = (JAXBElement<ActivemqJournalType>)
jaxbUnmarshaller.unmarshal(file);
+        assertEquals(count, read.getValue().getMessages().getMessage().size());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/activemq-kahadb-exporter/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-kahadb-exporter/src/test/resources/log4j.properties b/activemq-kahadb-exporter/src/test/resources/log4j.properties
index c1b759a..b9e69aa 100644
--- a/activemq-kahadb-exporter/src/test/resources/log4j.properties
+++ b/activemq-kahadb-exporter/src/test/resources/log4j.properties
@@ -18,9 +18,9 @@
 #
 # The logging properties used during tests..
 #
-log4j.rootLogger=INFO, out, stdout
+log4j.rootLogger=DEBUG, out, stdout
 
-#log4j.logger.org.apache.activemq.store.kahadb=TRACE
+log4j.logger.org.apache.activemq=DEBUG
 
 # CONSOLE appender not used by default
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/3fee126f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dc06c89..9541e1b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,6 +39,8 @@
 
     <activemq-version>5.14.3</activemq-version>
     <artemis-version>2.0.0-SNAPSHOT</artemis-version>
+    <slf4j-version>1.7.13</slf4j-version>
+    <log4j-version>1.2.17</log4j-version>
 
     <!-- Test dependency versions -->
     <junit-version>4.12</junit-version>
@@ -82,6 +84,11 @@
         <artifactId>artemis-cli</artifactId>
         <version>${artemis-version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.activemq</groupId>
+        <artifactId>artemis-openwire-protocol</artifactId>
+        <version>${artemis-version}</version>
+      </dependency>
 
       <!-- Test dependencies -->
       <dependency>
@@ -96,6 +103,21 @@
         <version>${mockito-version}</version>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>org.slf4j</groupId>
+        <artifactId>slf4j-log4j12</artifactId>
+        <version>${slf4j-version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.slf4j</groupId>
+        <artifactId>jcl-over-slf4j</artifactId>
+        <version>${slf4j-version}</version>
+      </dependency>
+      <dependency>
+        <groupId>log4j</groupId>
+        <artifactId>log4j</artifactId>
+        <version>${log4j-version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 


Mime
View raw message