activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [34/51] [partial] https://issues.apache.org/jira/browse/OPENWIRE-1
Date Thu, 24 Jul 2014 14:23:23 GMT
http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/utils/ExceptionSupport.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/ExceptionSupport.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/ExceptionSupport.java
new file mode 100644
index 0000000..3f113b1
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/ExceptionSupport.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.utils;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+
+/**
+ * Exception support class.
+ *
+ * Factory class for creating JMSException instances based on String messages or by
+ * wrapping other non-JMS exception.
+ *
+ * @since 1.0
+ */
+public final class ExceptionSupport {
+
+    private ExceptionSupport() {}
+
+    public static JMSException create(String msg, Throwable cause) {
+        JMSException exception = new JMSException(msg);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static JMSException create(String msg, Exception cause) {
+        JMSException exception = new JMSException(msg);
+        exception.setLinkedException(cause);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static JMSException create(Throwable cause) {
+        if (cause instanceof JMSException) {
+            return (JMSException) cause;
+        }
+        if (cause.getCause() instanceof JMSException) {
+            return (JMSException) cause.getCause();
+        }
+
+        String msg = cause.getMessage();
+        if (msg == null || msg.length() == 0) {
+            msg = cause.toString();
+        }
+        JMSException exception = new JMSException(msg);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static JMSException create(Exception cause) {
+        if (cause instanceof JMSException) {
+            return (JMSException) cause;
+        }
+        if (cause.getCause() instanceof JMSException) {
+            return (JMSException) cause.getCause();
+        }
+
+        String msg = cause.getMessage();
+        if (msg == null || msg.length() == 0) {
+            msg = cause.toString();
+        }
+        JMSException exception = new JMSException(msg);
+        exception.setLinkedException(cause);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static MessageEOFException createMessageEOFException(Exception cause) {
+        String msg = cause.getMessage();
+        if (msg == null || msg.length() == 0) {
+            msg = cause.toString();
+        }
+        MessageEOFException exception = new MessageEOFException(msg);
+        exception.setLinkedException(cause);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static MessageFormatException createMessageFormatException(Throwable cause) {
+        String msg = cause.getMessage();
+        if (msg == null || msg.length() == 0) {
+            msg = cause.toString();
+        }
+        MessageFormatException exception = new MessageFormatException(msg);
+        exception.initCause(cause);
+        return exception;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/utils/ObjectMessageInputStream.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/ObjectMessageInputStream.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/ObjectMessageInputStream.java
new file mode 100644
index 0000000..c6164a7
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/ObjectMessageInputStream.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.lang.reflect.Proxy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ObjectMessageInputStream extends ObjectInputStream {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ObjectMessageInputStream.class);
+    private static final ClassLoader FALLBACK_CLASS_LOADER = ObjectMessageInputStream.class.getClassLoader();
+
+    private final ClassLoader inLoader;
+
+    public ObjectMessageInputStream(InputStream in) throws IOException {
+        super(in);
+        inLoader = in.getClass().getClassLoader();
+    }
+
+    @Override
+    protected Class<?> resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        return load(classDesc.getName(), cl, inLoader);
+    }
+
+    @Override
+    protected Class<?> resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        Class<?>[] cinterfaces = new Class[interfaces.length];
+        for (int i = 0; i < interfaces.length; i++) {
+            cinterfaces[i] = load(interfaces[i], cl);
+        }
+
+        try {
+            return Proxy.getProxyClass(cl, cinterfaces);
+        } catch (IllegalArgumentException e) {
+            try {
+                return Proxy.getProxyClass(inLoader, cinterfaces);
+            } catch (IllegalArgumentException e1) {
+                // ignore
+            }
+            try {
+                return Proxy.getProxyClass(FALLBACK_CLASS_LOADER, cinterfaces);
+            } catch (IllegalArgumentException e2) {
+                // ignore
+            }
+
+            throw new ClassNotFoundException(null, e);
+        }
+    }
+
+    private Class<?> load(String className, ClassLoader... cl) throws ClassNotFoundException {
+        // check for simple types first
+        final Class<?> clazz = loadSimpleType(className);
+        if (clazz != null) {
+            LOG.trace("Loaded class: {} as simple type -> ", className, clazz);
+            return clazz;
+        }
+
+        // try the different class loaders
+        for (ClassLoader loader : cl) {
+            LOG.trace("Attempting to load class: {} using classloader: {}", className, cl);
+            try {
+                Class<?> answer = Class.forName(className, false, loader);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Loaded class: {} using classloader: {} -> ", new Object[] { className, cl, answer });
+                }
+                return answer;
+            } catch (ClassNotFoundException e) {
+                LOG.trace("Class not found: {} using classloader: {}", className, cl);
+                // ignore
+            }
+        }
+
+        // and then the fallback class loader
+        return Class.forName(className, false, FALLBACK_CLASS_LOADER);
+    }
+
+    /**
+     * Load a simple type
+     *
+     * @param name
+     *        the name of the class to load
+     * @return the class or <tt>null</tt> if it could not be loaded
+     */
+    public static Class<?> loadSimpleType(String name) {
+        if ("java.lang.byte[]".equals(name) || "byte[]".equals(name)) {
+            return byte[].class;
+        } else if ("java.lang.Byte[]".equals(name) || "Byte[]".equals(name)) {
+            return Byte[].class;
+        } else if ("java.lang.Object[]".equals(name) || "Object[]".equals(name)) {
+            return Object[].class;
+        } else if ("java.lang.String[]".equals(name) || "String[]".equals(name)) {
+            return String[].class;
+            // and these is common as well
+        } else if ("java.lang.String".equals(name) || "String".equals(name)) {
+            return String.class;
+        } else if ("java.lang.Boolean".equals(name) || "Boolean".equals(name)) {
+            return Boolean.class;
+        } else if ("boolean".equals(name)) {
+            return boolean.class;
+        } else if ("java.lang.Integer".equals(name) || "Integer".equals(name)) {
+            return Integer.class;
+        } else if ("int".equals(name)) {
+            return int.class;
+        } else if ("java.lang.Long".equals(name) || "Long".equals(name)) {
+            return Long.class;
+        } else if ("long".equals(name)) {
+            return long.class;
+        } else if ("java.lang.Short".equals(name) || "Short".equals(name)) {
+            return Short.class;
+        } else if ("short".equals(name)) {
+            return short.class;
+        } else if ("java.lang.Byte".equals(name) || "Byte".equals(name)) {
+            return Byte.class;
+        } else if ("byte".equals(name)) {
+            return byte.class;
+        } else if ("java.lang.Float".equals(name) || "Float".equals(name)) {
+            return Float.class;
+        } else if ("float".equals(name)) {
+            return float.class;
+        } else if ("java.lang.Double".equals(name) || "Double".equals(name)) {
+            return Double.class;
+        } else if ("double".equals(name)) {
+            return double.class;
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireConnection.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireConnection.java
new file mode 100644
index 0000000..cc3edc4
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireConnection.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.openwire.commands.ConnectionId;
+import org.apache.activemq.openwire.commands.ConnectionInfo;
+import org.apache.activemq.openwire.commands.ConsumerId;
+import org.apache.activemq.openwire.commands.LocalTransactionId;
+import org.apache.activemq.openwire.commands.RemoveInfo;
+import org.apache.activemq.openwire.commands.SessionId;
+import org.apache.activemq.openwire.commands.TransactionId;
+
+/**
+ * Encapsulates an ActiveMQ compatible OpenWire connection Id used to create instance
+ * of ConnectionId objects and provides methods for creating OpenWireSession instances
+ * that are children of this Connection.
+ */
+public class OpenWireConnection extends ConnectionInfo {
+
+    private static final OpenWireIdGenerator idGenerator = new OpenWireIdGenerator();
+
+    private SessionId connectionSessionId;
+
+    private final AtomicLong sessionIdGenerator = new AtomicLong(1);
+    private final AtomicLong consumerIdGenerator = new AtomicLong(1);
+    private final AtomicLong tempDestinationIdGenerator = new AtomicLong(1);
+    private final AtomicLong localTransactionIdGenerator = new AtomicLong(1);
+
+    /**
+     * Creates a fixed OpenWire Connection Id instance.
+     */
+    public OpenWireConnection() {
+        this(idGenerator.generateId());
+    }
+
+    /**
+     * Creates a fixed OpenWire Connection Id instance.
+     *
+     * @param connectionId
+     *        the set ConnectionId value that this class will use to seed new Session IDs.
+     */
+    public OpenWireConnection(String connectionId) {
+        this.connectionId = new ConnectionId(connectionId);
+    }
+
+    /**
+     * Creates a fixed OpenWire Connection Id instance.
+     *
+     * @param connectionId
+     *        the set ConnectionId value that this class will use to seed new Session IDs.
+     */
+    public OpenWireConnection(ConnectionId connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    @Override
+    public ConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    /**
+     * @return the SessionId used for the internal Connection Session instance.
+     */
+    public SessionId getConnectionSessionId() {
+        if (this.connectionSessionId == null) {
+            synchronized (this) {
+                if (this.connectionSessionId == null) {
+                    this.connectionSessionId = new SessionId(connectionId, -1);
+                }
+            }
+        }
+
+        return this.connectionSessionId;
+    }
+
+    /**
+     * Creates a new SessionId for a Session instance that is rooted by this Connection
+     *
+     * @return the next logical SessionId for this ConnectionId instance.
+     */
+    public SessionId getNextSessionId() {
+        return new SessionId(connectionId, sessionIdGenerator.getAndIncrement());
+    }
+
+    /**
+     * Creates a new Transaction ID used for local transactions created from this Connection.
+     *
+     * @return a new TransactionId instance.
+     */
+    public TransactionId getNextLocalTransactionId() {
+        return new LocalTransactionId(connectionId, localTransactionIdGenerator.getAndIncrement());
+    }
+
+    /**
+     * Create a new Consumer Id for ConnectionConsumer instances.
+     *
+     * @returns a new ConsumerId valid for use in ConnectionConsumer instances.
+     */
+    public ConsumerId getNextConnectionConsumerId() {
+        return new ConsumerId(getConnectionSessionId(), consumerIdGenerator.getAndIncrement());
+    }
+
+    /**
+     * Creates a new Temporary Destination name based on the Connection ID.
+     *
+     * @returns a new String destination name used to create temporary destinations.
+     */
+    public String getNextTemporaryDestinationName() {
+        return connectionId.getValue() + ":" + tempDestinationIdGenerator.getAndIncrement();
+    }
+
+    /**
+     * Factory method for creating a ConnectionInfo command that contains the connection
+     * ID from this OpenWireConnection instance.
+     *
+     * @return a new ConnectionInfo that contains the proper connection Id.
+     */
+    public ConnectionInfo createConnectionInfo() {
+        return this.copy();
+    }
+
+    /**
+     * Factory method for creating a suitable RemoveInfo command that can be used to remove
+     * this connection from a Broker.
+     *
+     * @return a new RemoveInfo that properly references this connection's Id.
+     */
+    public RemoveInfo createRemoveInfo() {
+        return new RemoveInfo(getConnectionId());
+    }
+
+    /**
+     * Factory method for OpenWireSession instances
+     *
+     * @return a new OpenWireSession with the next logical session ID for this connection.
+     */
+    public OpenWireSession createOpenWireSession() {
+        return new OpenWireSession(connectionId, sessionIdGenerator.getAndIncrement());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireConsumer.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireConsumer.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireConsumer.java
new file mode 100644
index 0000000..19c53ff
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireConsumer.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.utils;
+
+import org.apache.activemq.openwire.commands.ConsumerId;
+import org.apache.activemq.openwire.commands.ConsumerInfo;
+import org.apache.activemq.openwire.commands.OpenWireDestination;
+import org.apache.activemq.openwire.commands.RemoveInfo;
+
+/**
+ * Encapsulates an ActiveMQ compatible MessageConsumer ID using an OpenWire
+ * ConsumerId generated from a parent Session instance.
+ */
+public class OpenWireConsumer extends ConsumerInfo {
+
+    private final OpenWireSession parent;
+
+    /**
+     * Creates a new OpenWireConsumer instance with the assigned consumerId.
+     *
+     * @param parent
+     *        the OpenWireSession that created this instance.
+     * @param consumerId
+     *        the assigned consumer Id for this Consumer.
+     */
+    public OpenWireConsumer(OpenWireSession parent, ConsumerId consumerId) {
+        super(consumerId);
+        this.parent = parent;
+    }
+
+    /**
+     * Creates a new OpenWireConsumer from the given ConsumerInfo instance.
+     *
+     * @param parent
+     *        the OpenWireSession that created this instance.
+     * @param consumerInfo
+     *        the ConsumerInfo instance used to populate this one.
+     */
+    public OpenWireConsumer(OpenWireSession parent, ConsumerInfo consumerInfo) {
+        this.parent = parent;
+        consumerInfo.copy(this);
+    }
+
+    /**
+     * @return the parent OpenWireSessionId instance.
+     */
+    public OpenWireSession getParent() {
+        return parent;
+    }
+
+    /**
+     * @return the next logical delivery Id for messages dispatched by the consumer.
+     */
+    public long getNextDeliveryId() {
+        return parent.getNextDeliveryId();
+    }
+
+    @Override
+    public String toString() {
+        return consumerId.toString();
+    }
+
+    /**
+     * Factory method for creating a ConsumerInfo to wrap this instance's ConsumerId.
+     *
+     * @return a new ConsumerInfo instance that can be used to register a remote Consumer.
+     */
+    public ConsumerInfo createConsumerInfo() {
+        return this.copy();
+    }
+
+    /**
+     * Factory method for creating a ConsumerInfo to wrap this instance's ConsumerId.
+     *
+     * @param destination
+     *        the target destination for this ProducerInfo instance.
+     *
+     * @return a new ConsumerInfo instance that can be used to register a remote Consumer.
+     */
+    public ConsumerInfo createConsumerInfo(OpenWireDestination destination) {
+        this.setDestination(destination);
+        return this.copy();
+    }
+
+    /**
+     * Factory method for creating a RemoveInfo command that can be used to remove this
+     * consumer instance from the Broker.
+     *
+     * @return a new RemoveInfo instance that can remove this consumer.
+     */
+    public RemoveInfo createRemoveInfo() {
+        return new RemoveInfo(getConsumerId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireIdGenerator.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireIdGenerator.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireIdGenerator.java
new file mode 100644
index 0000000..6af2d20
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireIdGenerator.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.utils;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generator for Globally unique Strings.
+ */
+public class OpenWireIdGenerator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OpenWireIdGenerator.class);
+    private static final String UNIQUE_STUB;
+    private static int instanceCount;
+    private static String hostName;
+
+    private String seed;
+    private final AtomicLong sequence = new AtomicLong(1);
+    private int length;
+
+    public static final String PROPERTY_IDGENERATOR_PORT = "activemq.idgenerator.port";
+
+    static {
+        String stub = "";
+        boolean canAccessSystemProps = true;
+        try {
+            SecurityManager sm = System.getSecurityManager();
+            if (sm != null) {
+                sm.checkPropertiesAccess();
+            }
+        } catch (SecurityException se) {
+            canAccessSystemProps = false;
+        }
+
+        if (canAccessSystemProps) {
+            int idGeneratorPort = 0;
+            ServerSocket ss = null;
+            try {
+                idGeneratorPort = Integer.parseInt(System.getProperty(PROPERTY_IDGENERATOR_PORT, "0"));
+                LOG.trace("Using port {}", idGeneratorPort);
+                hostName = getLocalHostName();
+                ss = new ServerSocket(idGeneratorPort);
+                stub = "-" + ss.getLocalPort() + "-" + System.currentTimeMillis() + "-";
+                Thread.sleep(100);
+            } catch (Exception e) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("could not generate unique stub by using DNS and binding to local port", e);
+                } else {
+                    LOG.warn("could not generate unique stub by using DNS and binding to local port: {} {}", e.getClass().getCanonicalName(), e.getMessage());
+                }
+
+                // Restore interrupted state so higher level code can deal with
+                // it.
+                if (e instanceof InterruptedException) {
+                    Thread.currentThread().interrupt();
+                }
+            } finally {
+                if (ss != null) {
+                    try {
+                        ss.close();
+                    } catch (IOException ioe) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Closing the server socket failed", ioe);
+                        } else {
+                            LOG.warn("Closing the server socket failed" + " due " + ioe.getMessage());
+                        }
+                    }
+                }
+            }
+        }
+
+        if (hostName == null) {
+            hostName = "localhost";
+        }
+
+        hostName = sanitizeHostName(hostName);
+
+        if (stub.length() == 0) {
+            stub = "-1-" + System.currentTimeMillis() + "-";
+        }
+        UNIQUE_STUB = stub;
+    }
+
+    /**
+     * Construct an IdGenerator
+     */
+    public OpenWireIdGenerator(String prefix) {
+        synchronized (UNIQUE_STUB) {
+            this.seed = prefix + UNIQUE_STUB + (instanceCount++) + ":";
+            this.length = this.seed.length() + ("" + Long.MAX_VALUE).length();
+        }
+    }
+
+    public OpenWireIdGenerator() {
+        this("ID:" + hostName);
+    }
+
+    /**
+     * As we have to find the hostname as a side-affect of generating a unique
+     * stub, we allow it's easy retrieval here
+     *
+     * @return the local host name
+     */
+    public static String getHostName() {
+        return hostName;
+    }
+
+    /**
+     * Generate a unique id
+     *
+     * @return a unique id
+     */
+    public synchronized String generateId() {
+        StringBuilder sb = new StringBuilder(length);
+        sb.append(seed);
+        sb.append(sequence.getAndIncrement());
+        return sb.toString();
+    }
+
+    public static String sanitizeHostName(String hostName) {
+        boolean changed = false;
+
+        StringBuilder sb = new StringBuilder();
+        for (char ch : hostName.toCharArray()) {
+            // only include ASCII chars
+            if (ch < 127) {
+                sb.append(ch);
+            } else {
+                changed = true;
+            }
+        }
+
+        if (changed) {
+            String newHost = sb.toString();
+            LOG.info("Sanitized hostname from: {} to: {}", hostName, newHost);
+            return newHost;
+        } else {
+            return hostName;
+        }
+    }
+
+    /**
+     * Generate a unique ID - that is friendly for a URL or file system
+     *
+     * @return a unique id
+     */
+    public String generateSanitizedId() {
+        String result = generateId();
+        result = result.replace(':', '-');
+        result = result.replace('_', '-');
+        result = result.replace('.', '-');
+        return result;
+    }
+
+    /**
+     * From a generated id - return the seed (i.e. minus the count)
+     *
+     * @param id
+     *        the generated identifier
+     * @return the seed
+     */
+    public static String getSeedFromId(String id) {
+        String result = id;
+        if (id != null) {
+            int index = id.lastIndexOf(':');
+            if (index > 0 && (index + 1) < id.length()) {
+                result = id.substring(0, index);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * From a generated id - return the generator count
+     *
+     * @param id
+     * @return the count
+     */
+    public static long getSequenceFromId(String id) {
+        long result = -1;
+        if (id != null) {
+            int index = id.lastIndexOf(':');
+
+            if (index > 0 && (index + 1) < id.length()) {
+                String numStr = id.substring(index + 1, id.length());
+                result = Long.parseLong(numStr);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Does a proper compare on the IDs
+     *
+     * @param id1
+     * @param id2
+     *
+     * @return 0 if equal else a positive if id1 is > id2 ...
+     */
+    public static int compare(String id1, String id2) {
+        int result = -1;
+        String seed1 = OpenWireIdGenerator.getSeedFromId(id1);
+        String seed2 = OpenWireIdGenerator.getSeedFromId(id2);
+        if (seed1 != null && seed2 != null) {
+            result = seed1.compareTo(seed2);
+            if (result == 0) {
+                long count1 = OpenWireIdGenerator.getSequenceFromId(id1);
+                long count2 = OpenWireIdGenerator.getSequenceFromId(id2);
+                result = (int) (count1 - count2);
+            }
+        }
+        return result;
+    }
+
+    private static String getLocalHostName() throws UnknownHostException {
+        try {
+            return (InetAddress.getLocalHost()).getHostName();
+        } catch (UnknownHostException uhe) {
+            String host = uhe.getMessage(); // host = "hostname: hostname"
+            if (host != null) {
+                int colon = host.indexOf(':');
+                if (colon > 0) {
+                    return host.substring(0, colon);
+                }
+            }
+            throw uhe;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireMarshallingSupport.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireMarshallingSupport.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireMarshallingSupport.java
new file mode 100644
index 0000000..4189d12
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireMarshallingSupport.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtbuf.UTF8Buffer;
+
+/**
+ * The fixed version of the UTF8 encoding function. Some older JVM's UTF8
+ * encoding function breaks when handling large strings.
+ */
+public final class OpenWireMarshallingSupport {
+
+    public static final byte NULL = 0;
+    public static final byte BOOLEAN_TYPE = 1;
+    public static final byte BYTE_TYPE = 2;
+    public static final byte CHAR_TYPE = 3;
+    public static final byte SHORT_TYPE = 4;
+    public static final byte INTEGER_TYPE = 5;
+    public static final byte LONG_TYPE = 6;
+    public static final byte DOUBLE_TYPE = 7;
+    public static final byte FLOAT_TYPE = 8;
+    public static final byte STRING_TYPE = 9;
+    public static final byte BYTE_ARRAY_TYPE = 10;
+    public static final byte MAP_TYPE = 11;
+    public static final byte LIST_TYPE = 12;
+    public static final byte BIG_STRING_TYPE = 13;
+
+    private OpenWireMarshallingSupport() {
+    }
+
+    public static void marshalPrimitiveMap(Map<String, Object> map, DataOutput out) throws IOException {
+        if (map == null) {
+            out.writeInt(-1);
+        } else {
+            out.writeInt(map.size());
+            for (String name : map.keySet()) {
+                out.writeUTF(name);
+                Object value = map.get(name);
+                marshalPrimitive(out, value);
+            }
+        }
+    }
+
+    public static Map<String, Object> unmarshalPrimitiveMap(DataInput in) throws IOException {
+        return unmarshalPrimitiveMap(in, Integer.MAX_VALUE);
+    }
+
+    public static Map<String, Object> unmarshalPrimitiveMap(DataInput in, boolean force) throws IOException {
+        return unmarshalPrimitiveMap(in, Integer.MAX_VALUE, force);
+    }
+
+    public static Map<String, Object> unmarshalPrimitiveMap(DataInput in, int maxPropertySize) throws IOException {
+        return unmarshalPrimitiveMap(in, maxPropertySize, false);
+    }
+
+    /**
+     * @param in
+     * @return
+     * @throws IOException
+     * @throws IOException
+     */
+    public static Map<String, Object> unmarshalPrimitiveMap(DataInput in, int maxPropertySize, boolean force) throws IOException {
+        int size = in.readInt();
+        if (size > maxPropertySize) {
+            throw new IOException("Primitive map is larger than the allowed size: " + size);
+        }
+        if (size < 0) {
+            return null;
+        } else {
+            Map<String, Object> rc = new HashMap<String, Object>(size);
+            for (int i = 0; i < size; i++) {
+                String name = in.readUTF();
+                rc.put(name, unmarshalPrimitive(in, force));
+            }
+            return rc;
+        }
+    }
+
+    public static void marshalPrimitiveList(List<Object> list, DataOutput out) throws IOException {
+        out.writeInt(list.size());
+        for (Object element : list) {
+            marshalPrimitive(out, element);
+        }
+    }
+
+    public static List<Object> unmarshalPrimitiveList(DataInput in) throws IOException {
+        return unmarshalPrimitiveList(in, false);
+    }
+
+    public static List<Object> unmarshalPrimitiveList(DataInput in, boolean force) throws IOException {
+        int size = in.readInt();
+        List<Object> answer = new ArrayList<Object>(size);
+        while (size-- > 0) {
+            answer.add(unmarshalPrimitive(in, force));
+        }
+        return answer;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static void marshalPrimitive(DataOutput out, Object value) throws IOException {
+        if (value == null) {
+            marshalNull(out);
+        } else if (value.getClass() == Boolean.class) {
+            marshalBoolean(out, ((Boolean) value).booleanValue());
+        } else if (value.getClass() == Byte.class) {
+            marshalByte(out, ((Byte) value).byteValue());
+        } else if (value.getClass() == Character.class) {
+            marshalChar(out, ((Character) value).charValue());
+        } else if (value.getClass() == Short.class) {
+            marshalShort(out, ((Short) value).shortValue());
+        } else if (value.getClass() == Integer.class) {
+            marshalInt(out, ((Integer) value).intValue());
+        } else if (value.getClass() == Long.class) {
+            marshalLong(out, ((Long) value).longValue());
+        } else if (value.getClass() == Float.class) {
+            marshalFloat(out, ((Float) value).floatValue());
+        } else if (value.getClass() == Double.class) {
+            marshalDouble(out, ((Double) value).doubleValue());
+        } else if (value.getClass() == byte[].class) {
+            marshalByteArray(out, (byte[]) value);
+        } else if (value.getClass() == String.class) {
+            marshalString(out, (String) value);
+        } else if (value.getClass() == UTF8Buffer.class) {
+            marshalString(out, value.toString());
+        } else if (value instanceof Map) {
+            out.writeByte(MAP_TYPE);
+            marshalPrimitiveMap((Map<String, Object>) value, out);
+        } else if (value instanceof List) {
+            out.writeByte(LIST_TYPE);
+            marshalPrimitiveList((List<Object>) value, out);
+        } else {
+            throw new IOException("Object is not a primitive: " + value);
+        }
+    }
+
+    public static Object unmarshalPrimitive(DataInput in) throws IOException {
+        return unmarshalPrimitive(in, false);
+    }
+
+    public static Object unmarshalPrimitive(DataInput in, boolean force) throws IOException {
+        Object value = null;
+        byte type = in.readByte();
+        switch (type) {
+            case BYTE_TYPE:
+                value = Byte.valueOf(in.readByte());
+                break;
+            case BOOLEAN_TYPE:
+                value = in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+                break;
+            case CHAR_TYPE:
+                value = Character.valueOf(in.readChar());
+                break;
+            case SHORT_TYPE:
+                value = Short.valueOf(in.readShort());
+                break;
+            case INTEGER_TYPE:
+                value = Integer.valueOf(in.readInt());
+                break;
+            case LONG_TYPE:
+                value = Long.valueOf(in.readLong());
+                break;
+            case FLOAT_TYPE:
+                value = new Float(in.readFloat());
+                break;
+            case DOUBLE_TYPE:
+                value = new Double(in.readDouble());
+                break;
+            case BYTE_ARRAY_TYPE:
+                value = new byte[in.readInt()];
+                in.readFully((byte[]) value);
+                break;
+            case STRING_TYPE:
+                if (force) {
+                    value = in.readUTF();
+                } else {
+                    value = readUTF(in, in.readUnsignedShort());
+                }
+                break;
+            case BIG_STRING_TYPE: {
+                if (force) {
+                    value = readUTF8(in);
+                } else {
+                    value = readUTF(in, in.readInt());
+                }
+                break;
+            }
+            case MAP_TYPE:
+                value = unmarshalPrimitiveMap(in, true);
+                break;
+            case LIST_TYPE:
+                value = unmarshalPrimitiveList(in, true);
+                break;
+            case NULL:
+                value = null;
+                break;
+            default:
+                throw new IOException("Unknown primitive type: " + type);
+        }
+        return value;
+    }
+
+    public static UTF8Buffer readUTF(DataInput in, int length) throws IOException {
+        byte data[] = new byte[length];
+        in.readFully(data);
+        return new UTF8Buffer(data);
+    }
+
+    public static void marshalNull(DataOutput out) throws IOException {
+        out.writeByte(NULL);
+    }
+
+    public static void marshalBoolean(DataOutput out, boolean value) throws IOException {
+        out.writeByte(BOOLEAN_TYPE);
+        out.writeBoolean(value);
+    }
+
+    public static void marshalByte(DataOutput out, byte value) throws IOException {
+        out.writeByte(BYTE_TYPE);
+        out.writeByte(value);
+    }
+
+    public static void marshalChar(DataOutput out, char value) throws IOException {
+        out.writeByte(CHAR_TYPE);
+        out.writeChar(value);
+    }
+
+    public static void marshalShort(DataOutput out, short value) throws IOException {
+        out.writeByte(SHORT_TYPE);
+        out.writeShort(value);
+    }
+
+    public static void marshalInt(DataOutput out, int value) throws IOException {
+        out.writeByte(INTEGER_TYPE);
+        out.writeInt(value);
+    }
+
+    public static void marshalLong(DataOutput out, long value) throws IOException {
+        out.writeByte(LONG_TYPE);
+        out.writeLong(value);
+    }
+
+    public static void marshalFloat(DataOutput out, float value) throws IOException {
+        out.writeByte(FLOAT_TYPE);
+        out.writeFloat(value);
+    }
+
+    public static void marshalDouble(DataOutput out, double value) throws IOException {
+        out.writeByte(DOUBLE_TYPE);
+        out.writeDouble(value);
+    }
+
+    public static void marshalByteArray(DataOutput out, byte[] value) throws IOException {
+        marshalByteArray(out, value, 0, value.length);
+    }
+
+    public static void marshalByteArray(DataOutput out, byte[] value, int offset, int length) throws IOException {
+        out.writeByte(BYTE_ARRAY_TYPE);
+        out.writeInt(length);
+        out.write(value, offset, length);
+    }
+
+    public static void marshalString(DataOutput out, String s) throws IOException {
+        // If it's too big, out.writeUTF may not able able to write it out.
+        if (s.length() < Short.MAX_VALUE / 4) {
+            out.writeByte(STRING_TYPE);
+            out.writeUTF(s);
+        } else {
+            out.writeByte(BIG_STRING_TYPE);
+            writeUTF8(out, s);
+        }
+    }
+
+    public static void writeUTF8(DataOutput dataOut, String text) throws IOException {
+        if (text != null) {
+            int strlen = text.length();
+            int utflen = 0;
+            char[] charr = new char[strlen];
+            int c = 0;
+            int count = 0;
+
+            text.getChars(0, strlen, charr, 0);
+
+            for (int i = 0; i < strlen; i++) {
+                c = charr[i];
+                if ((c >= 0x0001) && (c <= 0x007F)) {
+                    utflen++;
+                } else if (c > 0x07FF) {
+                    utflen += 3;
+                } else {
+                    utflen += 2;
+                }
+            }
+            // TODO diff: Sun code - removed
+            byte[] bytearr = new byte[utflen + 4]; // TODO diff: Sun code
+            bytearr[count++] = (byte) ((utflen >>> 24) & 0xFF); // TODO diff:
+            // Sun code
+            bytearr[count++] = (byte) ((utflen >>> 16) & 0xFF); // TODO diff:
+            // Sun code
+            bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+            bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
+            for (int i = 0; i < strlen; i++) {
+                c = charr[i];
+                if ((c >= 0x0001) && (c <= 0x007F)) {
+                    bytearr[count++] = (byte) c;
+                } else if (c > 0x07FF) {
+                    bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+                    bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+                    bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+                } else {
+                    bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+                    bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+                }
+            }
+            dataOut.write(bytearr);
+
+        } else {
+            dataOut.writeInt(-1);
+        }
+    }
+
+    public static String readUTF8(DataInput dataIn) throws IOException {
+        int utflen = dataIn.readInt(); // TODO diff: Sun code
+        if (utflen > -1) {
+            StringBuffer str = new StringBuffer(utflen);
+            byte bytearr[] = new byte[utflen];
+            int c;
+            int char2;
+            int char3;
+            int count = 0;
+
+            dataIn.readFully(bytearr, 0, utflen);
+
+            while (count < utflen) {
+                c = bytearr[count] & 0xff;
+                switch (c >> 4) {
+                    case 0:
+                    case 1:
+                    case 2:
+                    case 3:
+                    case 4:
+                    case 5:
+                    case 6:
+                    case 7:
+                        /* 0xxxxxxx */
+                        count++;
+                        str.append((char) c);
+                        break;
+                    case 12:
+                    case 13:
+                        /* 110x xxxx 10xx xxxx */
+                        count += 2;
+                        if (count > utflen) {
+                            throw new UTFDataFormatException();
+                        }
+                        char2 = bytearr[count - 1];
+                        if ((char2 & 0xC0) != 0x80) {
+                            throw new UTFDataFormatException();
+                        }
+                        str.append((char) (((c & 0x1F) << 6) | (char2 & 0x3F)));
+                        break;
+                    case 14:
+                        /* 1110 xxxx 10xx xxxx 10xx xxxx */
+                        count += 3;
+                        if (count > utflen) {
+                            throw new UTFDataFormatException();
+                        }
+                        char2 = bytearr[count - 2]; // TODO diff: Sun code
+                        char3 = bytearr[count - 1]; // TODO diff: Sun code
+                        if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+                            throw new UTFDataFormatException();
+                        }
+                        str.append((char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)));
+                        break;
+                    default:
+                        /* 10xx xxxx, 1111 xxxx */
+                        throw new UTFDataFormatException();
+                }
+            }
+            // The number of chars produced may be less than utflen
+            return new String(str);
+        } else {
+            return null;
+        }
+    }
+
+    public static String propertiesToString(Properties props) throws IOException {
+        String result = "";
+        if (props != null) {
+            DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream();
+            props.store(dataOut, "");
+            result = new String(dataOut.getData(), 0, dataOut.size());
+            dataOut.close();
+        }
+        return result;
+    }
+
+    public static Properties stringToProperties(String str) throws IOException {
+        Properties result = new Properties();
+        if (str != null && str.length() > 0) {
+            DataByteArrayInputStream dataIn = new DataByteArrayInputStream(str.getBytes());
+            result.load(dataIn);
+            dataIn.close();
+        }
+        return result;
+    }
+
+    public static String truncate64(String text) {
+        if (text.length() > 63) {
+            text = text.substring(0, 45) + "..." + text.substring(text.length() - 12);
+        }
+        return text;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireProducer.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireProducer.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireProducer.java
new file mode 100644
index 0000000..5952c2f
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireProducer.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.openwire.commands.MessageId;
+import org.apache.activemq.openwire.commands.OpenWireDestination;
+import org.apache.activemq.openwire.commands.ProducerId;
+import org.apache.activemq.openwire.commands.ProducerInfo;
+import org.apache.activemq.openwire.commands.RemoveInfo;
+import org.apache.activemq.openwire.commands.SessionId;
+
+/**
+ * Encapsulates an ActiveMQ compatible OpenWire Producer ID and provides
+ * functionality used to generate message IDs for the producer.
+ */
+public class OpenWireProducer extends ProducerInfo {
+
+    private final OpenWireSession parent;
+
+    private final AtomicLong messageSequence = new AtomicLong(1);
+
+    /**
+     * Creates a new instance with the given parent Session Id and assigned Producer Id
+     *
+     * @param parent
+     *        the OpenWireSessionId that is the parent of the new Producer.
+     * @param producerId
+     *        the ProducerId assigned to this instance.
+     */
+    public OpenWireProducer(OpenWireSession parent, ProducerId producerId) {
+        super(producerId);
+        this.parent = parent;
+    }
+
+    /**
+     * Creates a new instance with the given parent Session Id and copy the given ProducerInfo
+     *
+     * @param parent
+     *        the OpenWireSessionId that is the parent of the new Producer.
+     * @param producerInfo
+     *        the ProducerInfo used to populate this instance.
+     */
+    public OpenWireProducer(OpenWireSession parent, ProducerInfo producerInfo) {
+        this.parent = parent;
+        producerInfo.copy(this);
+    }
+
+    /**
+     * @return the SessionId of this ProducerId instance.
+     */
+    public SessionId getSessionId() {
+        return this.parent.getSessionId();
+    }
+
+    /**
+     * @return the parent OpenWireSessionId
+     */
+    public OpenWireSession getParent() {
+        return parent;
+    }
+
+    /**
+     * Factory method used to simplify creation of MessageIds from this Producer
+     *
+     * @return the next logical MessageId for the producer this instance represents.
+     */
+    public MessageId getNextMessageId() {
+        return new MessageId(producerId, messageSequence.getAndIncrement());
+    }
+
+    @Override
+    public String toString() {
+        return producerId.toString();
+    }
+
+    /**
+     * Factory method for creating a ProducerInfo to wrap this instance's ProducerId.
+     *
+     * @return a new ProducerInfo instance that can be used to register a remote producer.
+     */
+    public ProducerInfo createProducerInfo() {
+        return this.copy();
+    }
+
+    /**
+     * Factory method for creating a ProducerInfo to wrap this instance's ProducerId.
+     *
+     * @param destination
+     *        the target destination for this ProducerInfo instance.
+     *
+     * @return a new ProducerInfo instance that can be used to register a remote producer.
+     */
+    public ProducerInfo createProducerInfo(OpenWireDestination destination) {
+        this.setDestination(destination);
+        return this.copy();
+    }
+
+    /**
+     * Factory method for creating a RemoveInfo command that can be used to remove this
+     * producer instance from the Broker.
+     *
+     * @return a new RemoveInfo instance that can remove this producer.
+     */
+    public RemoveInfo createRemoveInfo() {
+        return new RemoveInfo(getProducerId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireSession.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireSession.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireSession.java
new file mode 100644
index 0000000..1830dc4
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/OpenWireSession.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.openwire.commands.ConnectionId;
+import org.apache.activemq.openwire.commands.ConsumerId;
+import org.apache.activemq.openwire.commands.ProducerId;
+import org.apache.activemq.openwire.commands.RemoveInfo;
+import org.apache.activemq.openwire.commands.SessionId;
+import org.apache.activemq.openwire.commands.SessionInfo;
+
+/**
+ * Encapsulates an ActiveMQ compatible OpenWire Session ID and provides methods
+ * for creating consumer and producer ID objects that are children of this session.
+ */
+public class OpenWireSession extends SessionInfo {
+
+    private final AtomicLong consumerIdGenerator = new AtomicLong(1);
+    private final AtomicLong producerIdGenerator = new AtomicLong(1);
+    private final AtomicLong deliveryIdGenerator = new AtomicLong(1);
+
+    /**
+     * Creates a new OpenWireSessionId instance with the given ID.
+     *
+     * @param sessionId
+     *        the SessionId assigned to this instance.
+     */
+    public OpenWireSession(SessionId sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    /**
+     * Creates a new OpenWireSessionId instance based on the given ConnectionId
+     * and a session sequence number.
+     *
+     * @param connectionId
+     *        the ConnectionId to use for this Session ID.
+     * @param sequence
+     *        the sequence number that identifies this Session instance.
+     */
+    public OpenWireSession(ConnectionId connectionId, long sequence) {
+        this(new SessionId(connectionId, sequence));
+    }
+
+    /**
+     * @return the fixed SessionId of this OpenWireSessionId instance.
+     */
+    @Override
+    public SessionId getSessionId() {
+        return sessionId;
+    }
+
+    /**
+     * @return the next ConsumerId instance for the managed SessionId.
+     */
+    public ConsumerId getNextConsumerId() {
+        return new ConsumerId(sessionId, consumerIdGenerator.getAndIncrement());
+    }
+
+    /**
+     * @return the next ProducerId instance for the managed SessionId.
+     */
+    public ProducerId getNextProducerId() {
+        return new ProducerId(sessionId, producerIdGenerator.getAndIncrement());
+    }
+
+    /**
+     * @return the next Id to assign incoming message deliveries from the managed session Id.
+     */
+    public long getNextDeliveryId() {
+        return this.deliveryIdGenerator.getAndIncrement();
+    }
+
+    @Override
+    public String toString() {
+        return sessionId.toString();
+    }
+
+    /**
+     * Factory method used to create OpenWireConsumerId instances from this Session.
+     *
+     * @returns an OpenWireConsumerId rooted at this SessionId.
+     */
+    public OpenWireConsumer createOpenWireConsumer() {
+        return new OpenWireConsumer(this, getNextConsumerId());
+    }
+
+    /**
+     * Factory method used to create OpenWireProducerId instances from this Session.
+     *
+     * @returns an OpenWireProducerId rooted at this SessionId.
+     */
+    public OpenWireProducer createOpenWireProducer() {
+        return new OpenWireProducer(this, getNextProducerId());
+    }
+
+    /**
+     * Factory method for creating a SessionInfo to wrap the managed SessionId
+     *
+     * @returns a SessionInfo object that wraps the internal SessionId.
+     */
+    public SessionInfo createSessionInfo() {
+        return new SessionInfo(getSessionId());
+    }
+
+    /**
+     * Factory method for creating a suitable RemoveInfo for this session instance.
+     *
+     * @return a new RemoveInfo instance that can be used to remove this session.
+     */
+    public RemoveInfo createRemoveInfo() {
+        return new RemoveInfo(getSessionId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/utils/UnresolvedDestinationTransformer.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/UnresolvedDestinationTransformer.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/UnresolvedDestinationTransformer.java
new file mode 100644
index 0000000..252c758
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/UnresolvedDestinationTransformer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.utils;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.activemq.openwire.commands.OpenWireDestination;
+
+/**
+ * Allows for the configuration of a user defined Destination transformer that will
+ * be called when it cannot be logically determined what type of Destination is
+ * being converted to an OpenWireDestination.  This can happen for instance when a
+ * JMS Provider's Destination implements both Topic and Queue from the same root
+ * Destination object.
+ */
+public interface UnresolvedDestinationTransformer {
+
+    /**
+     * Given the JMS Destination convert it to the correct OpenWire destination
+     *
+     * @param destination
+     *        the foreign destination to convert to the proper OpenWire type.
+     *
+     * @return an OpenWireDestination instance of the correct type.
+     *
+     * @throws JMSException if an error occurs while converting the Destination type.
+     */
+    public OpenWireDestination transform(Destination destination) throws JMSException;
+
+    /**
+     * Given the name of a JMS Destination convert it to the correct OpenWire destination.
+     *
+     * @param destination
+     *        the name of a destination to convert to the proper OpenWire type.
+     *
+     * @return an OpenWireDestination instance of the correct type.
+     *
+     * @throws JMSException if an error occurs while converting the Destination type.
+     */
+    public OpenWireDestination transform(String destination) throws JMSException;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/test/java/org/apache/activemq/openwire/codec/BooleanStreamTest.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/test/java/org/apache/activemq/openwire/codec/BooleanStreamTest.java b/openwire-core/src/test/java/org/apache/activemq/openwire/codec/BooleanStreamTest.java
new file mode 100644
index 0000000..0bfb4c3
--- /dev/null
+++ b/openwire-core/src/test/java/org/apache/activemq/openwire/codec/BooleanStreamTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import junit.framework.AssertionFailedError;
+
+import org.apache.activemq.openwire.codec.BooleanStream;
+import org.apache.activemq.openwire.codec.OpenWireFormat;
+import org.apache.activemq.openwire.commands.CommandTypes;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for the OpenWire BooleanStream class.
+ */
+public class BooleanStreamTest {
+
+    protected OpenWireFormat openWireformat;
+    protected int endOfStreamMarker = 0x12345678;
+    int numberOfBytes = 8 * 200;
+
+    interface BooleanValueSet {
+        boolean getBooleanValueFor(int index, int count);
+    }
+
+    @Test
+    public void testBooleanMarshallingUsingAllTrue() throws Exception {
+        testBooleanStream(numberOfBytes, new BooleanValueSet() {
+            @Override
+            public boolean getBooleanValueFor(int index, int count) {
+                return true;
+            }
+        });
+    }
+
+    @Test
+    public void testBooleanMarshallingUsingAllFalse() throws Exception {
+        testBooleanStream(numberOfBytes, new BooleanValueSet() {
+            @Override
+            public boolean getBooleanValueFor(int index, int count) {
+                return false;
+            }
+        });
+    }
+
+    @Test
+    public void testBooleanMarshallingUsingOddAlternateTrueFalse() throws Exception {
+        testBooleanStream(numberOfBytes, new BooleanValueSet() {
+            @Override
+            public boolean getBooleanValueFor(int index, int count) {
+                return (index & 1) == 0;
+            }
+        });
+    }
+
+    @Test
+    public void testBooleanMarshallingUsingEvenAlternateTrueFalse() throws Exception {
+        testBooleanStream(numberOfBytes, new BooleanValueSet() {
+            @Override
+            public boolean getBooleanValueFor(int index, int count) {
+                return (index & 1) != 0;
+            }
+        });
+    }
+
+    protected void testBooleanStream(int numberOfBytes, BooleanValueSet valueSet) throws Exception {
+        for (int i = 0; i < numberOfBytes; i++) {
+            try {
+                assertMarshalBooleans(i, valueSet);
+            } catch (Throwable e) {
+                throw (AssertionFailedError) new AssertionFailedError("Iteration failed at: " + i).initCause(e);
+            }
+        }
+    }
+
+    protected void assertMarshalBooleans(int count, BooleanValueSet valueSet) throws Exception {
+        BooleanStream bs = new BooleanStream();
+        for (int i = 0; i < count; i++) {
+            bs.writeBoolean(valueSet.getBooleanValueFor(i, count));
+        }
+        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        DataOutputStream ds = new DataOutputStream(buffer);
+        bs.marshal(ds);
+        ds.writeInt(endOfStreamMarker);
+
+        // now lets read from the stream
+        ds.close();
+
+        ByteArrayInputStream in = new ByteArrayInputStream(buffer.toByteArray());
+        DataInputStream dis = new DataInputStream(in);
+        bs = new BooleanStream();
+        try {
+            bs.unmarshal(dis);
+        } catch (Exception e) {
+            e.printStackTrace();
+            fail("Failed to unmarshal: " + count + " booleans: " + e);
+        }
+
+        for (int i = 0; i < count; i++) {
+            boolean expected = valueSet.getBooleanValueFor(i, count);
+            try {
+                boolean actual = bs.readBoolean();
+                assertEquals("value of object: " + i + " was: " + actual, expected, actual);
+            } catch (IOException e) {
+                e.printStackTrace();
+                fail("Failed to parse boolean: " + i + " out of: " + count + " due to: " + e);
+            }
+        }
+        int marker = dis.readInt();
+        assertEquals("Marker int when unmarshalling: " + count + " booleans", Integer.toHexString(endOfStreamMarker), Integer.toHexString(marker));
+
+        // lets try read and we should get an exception
+        try {
+            dis.readByte();
+            fail("Should have reached the end of the stream");
+        } catch (IOException e) {
+            // worked!
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        openWireformat = createOpenWireFormat();
+    }
+
+    protected OpenWireFormat createOpenWireFormat() {
+        OpenWireFormat wf = new OpenWireFormat(CommandTypes.PROTOCOL_VERSION);
+        wf.setCacheEnabled(true);
+        wf.setStackTraceEnabled(false);
+        wf.setVersion(1);
+        return wf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/test/java/org/apache/activemq/openwire/codec/NumberRangesWhileMarshallingTest.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/test/java/org/apache/activemq/openwire/codec/NumberRangesWhileMarshallingTest.java b/openwire-core/src/test/java/org/apache/activemq/openwire/codec/NumberRangesWhileMarshallingTest.java
new file mode 100644
index 0000000..8ce0303
--- /dev/null
+++ b/openwire-core/src/test/java/org/apache/activemq/openwire/codec/NumberRangesWhileMarshallingTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.activemq.openwire.codec.OpenWireFormat;
+import org.apache.activemq.openwire.commands.CommandTypes;
+import org.apache.activemq.openwire.commands.OpenWireTextMessage;
+import org.apache.activemq.openwire.commands.SessionId;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NumberRangesWhileMarshallingTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NumberRangesWhileMarshallingTest.class);
+
+    protected String connectionId = "Cheese";
+    protected ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+    protected DataOutputStream ds = new DataOutputStream(buffer);
+    protected OpenWireFormat openWireformat;
+    protected int endOfStreamMarker = 0x12345678;
+
+    @Test
+    public void testLongNumberRanges() throws Exception {
+        long[] numberValues = {
+            // bytes
+            0, 1, 0x7e, 0x7f, 0x80, 0x81, 0xf0, 0xff,
+            // shorts
+            0x7eff, 0x7fffL, 0x8001L, 0x8000L, 0xe000L, 0xe0001L, 0xff00L, 0xffffL,
+            // ints
+            0x10000L, 0x700000L, 0x12345678L, 0x72345678L, 0x7fffffffL, 0x80000000L, 0x80000001L, 0xE0000001L, 0xFFFFFFFFL,
+            // 3 byte longs
+            0x123456781L, 0x1234567812L, 0x12345678123L, 0x123456781234L, 0x1234567812345L, 0x12345678123456L, 0x7e345678123456L, 0x7fffffffffffffL,
+            0x80000000000000L, 0x80000000000001L, 0xe0000000000001L, 0xffffffffffffffL,
+            // 4 byte longs
+            0x1234567812345678L, 0x7fffffffffffffffL, 0x8000000000000000L, 0x8000000000000001L, 0xe000000000000001L, 0xffffffffffffffffL, 1 };
+
+        for (int i = 0; i < numberValues.length; i++) {
+            long value = numberValues[i];
+
+            SessionId object = new SessionId();
+            object.setConnectionId(connectionId);
+            object.setValue(value);
+            writeObject(object);
+        }
+        ds.writeInt(endOfStreamMarker);
+
+        // now lets read from the stream
+        ds.close();
+
+        ByteArrayInputStream in = new ByteArrayInputStream(buffer.toByteArray());
+        DataInputStream dis = new DataInputStream(in);
+        for (int i = 0; i < numberValues.length; i++) {
+            long value = numberValues[i];
+            String expected = Long.toHexString(value);
+            LOG.info("Unmarshaling value: " + i + " = " + expected);
+
+            SessionId command = (SessionId) openWireformat.unmarshal(dis);
+            assertEquals("connection ID in object: " + i, connectionId, command.getConnectionId());
+            String actual = Long.toHexString(command.getValue());
+            assertEquals("value of object: " + i + " was: " + actual, expected, actual);
+        }
+        int marker = dis.readInt();
+        assertEquals("Marker int", Integer.toHexString(endOfStreamMarker), Integer.toHexString(marker));
+
+        // lets try read and we should get an exception
+        try {
+            byte value = dis.readByte();
+            fail("Should have reached the end of the stream: " + value);
+        } catch (IOException e) {
+            // worked!
+        }
+    }
+
+    @Test
+    public void testMaxFrameSize() throws Exception {
+        OpenWireFormat wf = new OpenWireFormat(CommandTypes.PROTOCOL_VERSION);
+        wf.setMaxFrameSize(10);
+        OpenWireTextMessage msg = new OpenWireTextMessage();
+        msg.setText("This is a test");
+
+        writeObject(msg);
+        ds.writeInt(endOfStreamMarker);
+
+        // now lets read from the stream
+        ds.close();
+
+        ByteArrayInputStream in = new ByteArrayInputStream(buffer.toByteArray());
+        DataInputStream dis = new DataInputStream(in);
+
+        try {
+            wf.unmarshal(dis);
+        } catch (IOException ioe) {
+            return;
+        }
+
+        fail("Should fail because of the large frame size");
+    }
+
+    @Test
+    public void testDefaultMaxFrameSizeUnlimited() {
+        OpenWireFormat wf = new OpenWireFormat(CommandTypes.PROTOCOL_VERSION);
+        assertEquals(Long.MAX_VALUE, wf.getMaxFrameSize());
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        openWireformat = createOpenWireFormat();
+    }
+
+    protected OpenWireFormat createOpenWireFormat() {
+        OpenWireFormat wf = new OpenWireFormat(CommandTypes.PROTOCOL_VERSION);
+        wf.setCacheEnabled(true);
+        wf.setStackTraceEnabled(false);
+        wf.setVersion(1);
+        return wf;
+    }
+
+    private void writeObject(Object object) throws IOException {
+        openWireformat.marshal(object, ds);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/test/java/org/apache/activemq/openwire/commands/DataStructureTestSupport.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/test/java/org/apache/activemq/openwire/commands/DataStructureTestSupport.java b/openwire-core/src/test/java/org/apache/activemq/openwire/commands/DataStructureTestSupport.java
new file mode 100644
index 0000000..286fa22
--- /dev/null
+++ b/openwire-core/src/test/java/org/apache/activemq/openwire/commands/DataStructureTestSupport.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+
+import junit.framework.AssertionFailedError;
+
+import org.apache.activemq.openwire.codec.OpenWireFormat;
+import org.apache.activemq.openwire.commands.Command;
+import org.fusesource.hawtbuf.Buffer;
+import org.junit.Before;
+
+public abstract class DataStructureTestSupport {
+
+    protected boolean cacheEnabled;
+    protected OpenWireFormat wireFormat;
+
+    public void assertBeanMarshalls(Object original) throws IOException {
+        Object o = marshalAndUnmarshall(original, wireFormat);
+        assertNotNull(o);
+        assertEquals(original, o);
+    }
+
+    public static void assertEquals(Object expect, Object was) {
+        if (expect == null ^ was == null) {
+            throw new AssertionFailedError("Not equals, expected: " + expect + ", was: " + was);
+        }
+
+        if (expect == null) {
+            return;
+        }
+
+        if (expect.getClass() != was.getClass()) {
+            throw new AssertionFailedError("Not equals, classes don't match. expected: " + expect.getClass() + ", was: " + was.getClass());
+        }
+
+        if (expect.getClass().isArray()) {
+            Class<?> componentType = expect.getClass().getComponentType();
+            if (componentType.isPrimitive()) {
+                boolean ok = false;
+                if (componentType == byte.class) {
+                    ok = Arrays.equals((byte[]) expect, (byte[]) was);
+                }
+                if (componentType == char.class) {
+                    ok = Arrays.equals((char[]) expect, (char[]) was);
+                }
+                if (componentType == short.class) {
+                    ok = Arrays.equals((short[]) expect, (short[]) was);
+                }
+                if (componentType == int.class) {
+                    ok = Arrays.equals((int[]) expect, (int[]) was);
+                }
+                if (componentType == long.class) {
+                    ok = Arrays.equals((long[]) expect, (long[]) was);
+                }
+                if (componentType == double.class) {
+                    ok = Arrays.equals((double[]) expect, (double[]) was);
+                }
+                if (componentType == float.class) {
+                    ok = Arrays.equals((float[]) expect, (float[]) was);
+                }
+                if (!ok) {
+                    throw new AssertionFailedError("Arrays not equal");
+                }
+            } else {
+                Object expectArray[] = (Object[]) expect;
+                Object wasArray[] = (Object[]) was;
+                if (expectArray.length != wasArray.length) {
+                    throw new AssertionFailedError("Not equals, array lengths don't match. expected: " + expectArray.length + ", was: " + wasArray.length);
+                }
+                for (int i = 0; i < wasArray.length; i++) {
+                    assertEquals(expectArray[i], wasArray[i]);
+                }
+
+            }
+        } else if (expect instanceof Command) {
+            assertEquals(expect.getClass(), was.getClass());
+            Method[] methods = expect.getClass().getMethods();
+            for (int i = 0; i < methods.length; i++) {
+                Method method = methods[i];
+                if ((method.getName().startsWith("get") || method.getName().startsWith("is")) && method.getParameterTypes().length == 0
+                    && method.getReturnType() != null) {
+
+                    // Check to see if there is a setter for the method.
+                    try {
+                        if (method.getName().startsWith("get")) {
+                            expect.getClass().getMethod(method.getName().replaceFirst("get", "set"), new Class[] { method.getReturnType() });
+                        } else {
+                            expect.getClass().getMethod(method.getName().replaceFirst("is", "set"), new Class[] { method.getReturnType() });
+                        }
+                    } catch (Throwable ignore) {
+                        continue;
+                    }
+
+                    try {
+                        assertEquals(method.invoke(expect, (Object) null), method.invoke(was, (Object) null));
+                    } catch (IllegalArgumentException e) {
+                    } catch (IllegalAccessException e) {
+                    } catch (InvocationTargetException e) {
+                    }
+                }
+            }
+        } else {
+            org.junit.Assert.assertEquals(expect, was);
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        wireFormat = createWireFormat();
+    }
+
+    protected OpenWireFormat createWireFormat() {
+        OpenWireFormat answer = new OpenWireFormat(10);
+        answer.setCacheEnabled(cacheEnabled);
+        return answer;
+    }
+
+    protected Object marshalAndUnmarshall(Object original, OpenWireFormat wireFormat) throws IOException {
+        Buffer packet = wireFormat.marshal(original);
+        return wireFormat.unmarshal(packet);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/test/java/org/apache/activemq/openwire/commands/MessageTest.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/test/java/org/apache/activemq/openwire/commands/MessageTest.java b/openwire-core/src/test/java/org/apache/activemq/openwire/commands/MessageTest.java
new file mode 100644
index 0000000..8e3494a
--- /dev/null
+++ b/openwire-core/src/test/java/org/apache/activemq/openwire/commands/MessageTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.openwire.commands.MessageId;
+import org.apache.activemq.openwire.commands.OpenWireMessage;
+import org.apache.activemq.openwire.commands.OpenWireQueue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class MessageTest extends DataStructureTestSupport {
+
+    public MessageTest(Boolean cacheEnabled) {
+        this.cacheEnabled = cacheEnabled;
+    }
+
+    @Parameters
+    public static Collection<Object[]> data() {
+      Object[][] data = new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+      return Arrays.asList(data);
+    }
+
+    @Test
+    public void testOpenWireMessageMarshaling() throws IOException {
+        OpenWireMessage message = new OpenWireMessage();
+        message.setCommandId((short)1);
+        message.setOriginalDestination(new OpenWireQueue("queue"));
+        message.setGroupID("group");
+        message.setGroupSequence(4);
+        message.setCorrelationId("correlation");
+        message.setMessageId(new MessageId("c1:1:1", 1));
+        assertBeanMarshalls(message);
+    }
+
+    @Test
+    public void testOpenWireMessageMarshalingBigMessageId() throws IOException {
+        OpenWireMessage message = new OpenWireMessage();
+        message.setCommandId((short)1);
+        message.setOriginalDestination(new OpenWireQueue("queue"));
+        message.setGroupID("group");
+        message.setGroupSequence(4);
+        message.setCorrelationId("correlation");
+        message.setMessageId(new MessageId("c1:1:1", Short.MAX_VALUE));
+        assertBeanMarshalls(message);
+    }
+
+    @Test
+    public void testOpenWireMessageMarshalingBiggerMessageId() throws IOException {
+        OpenWireMessage message = new OpenWireMessage();
+        message.setCommandId((short)1);
+        message.setOriginalDestination(new OpenWireQueue("queue"));
+        message.setGroupID("group");
+        message.setGroupSequence(4);
+        message.setCorrelationId("correlation");
+        message.setMessageId(new MessageId("c1:1:1", Integer.MAX_VALUE));
+        assertBeanMarshalls(message);
+    }
+
+    @Test
+    public void testOpenWireMessageMarshalingBiggestMessageId() throws IOException {
+        OpenWireMessage message = new OpenWireMessage();
+        message.setCommandId((short)1);
+        message.setOriginalDestination(new OpenWireQueue("queue"));
+        message.setGroupID("group");
+        message.setGroupSequence(4);
+        message.setCorrelationId("correlation");
+        message.setMessageId(new MessageId("c1:1:1", Long.MAX_VALUE));
+        assertBeanMarshalls(message);
+    }
+
+    @Test
+    public void testMessageIdMarshaling() throws IOException {
+        assertBeanMarshalls(new MessageId("c1:1:1", 1));
+    }
+
+    @Test
+    public void testPropRemove() throws Exception {
+        OpenWireMessage message = new OpenWireMessage();
+        message.setProperty("RM","RM");
+
+        OpenWireMessage unMarshalled = (OpenWireMessage) marshalAndUnmarshall(message, wireFormat);
+
+        unMarshalled.getProperty("NA");
+        unMarshalled.removeProperty("RM");
+
+        OpenWireMessage unMarshalledAgain = (OpenWireMessage) marshalAndUnmarshall(unMarshalled, wireFormat);
+        assertNull("Prop is gone", unMarshalledAgain.getProperty("RM"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireBytesMessageTest.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireBytesMessageTest.java b/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireBytesMessageTest.java
new file mode 100644
index 0000000..88af005
--- /dev/null
+++ b/openwire-core/src/test/java/org/apache/activemq/openwire/commands/OpenWireBytesMessageTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.activemq.openwire.commands.CommandTypes;
+import org.apache.activemq.openwire.commands.OpenWireBytesMessage;
+import org.junit.Test;
+
+public class OpenWireBytesMessageTest {
+
+    // The following text should compress well
+    private static final String TEXT = "The quick red fox jumped over the lazy brown dog. "
+            + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+            + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+            + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+            + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+            + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+            + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+            + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+            + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. ";
+
+    @Test
+    public void testGetDataStructureType() {
+        OpenWireBytesMessage msg = new OpenWireBytesMessage();
+        assertEquals(msg.getDataStructureType(), CommandTypes.OPENWIRE_BYTES_MESSAGE);
+    }
+
+    @Test
+    public void testGetBodyLength() throws Exception {
+        OpenWireBytesMessage msg = new OpenWireBytesMessage();
+
+        byte[] data = new byte[80];
+
+        for (byte i = 0; i < data.length; i++) {
+            data[i] = i;
+        }
+
+        msg.setBodyBytes(data);
+
+        assertTrue(msg.getBodyLength() == 80);
+    }
+
+    @Test
+    public void testBodyCompression() throws Exception {
+        OpenWireBytesMessage message = new OpenWireBytesMessage();
+        message.setUseCompression(true);
+        message.setBodyBytes(TEXT.getBytes("UTF8"));
+
+        int compressedSize = message.getContent().getLength();
+        byte[] bytes = message.getBodyBytes();
+
+        assertTrue(compressedSize < bytes.length);
+
+        String rcvString = new String(bytes, "UTF8");
+        assertEquals(TEXT.length(), rcvString.length());
+        assertEquals(TEXT, rcvString);
+        assertTrue(message.isCompressed());
+    }
+}
\ No newline at end of file


Mime
View raw message