activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [35/51] [partial] https://issues.apache.org/jira/browse/OPENWIRE-1
Date Thu, 24 Jul 2014 14:23:24 GMT
http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java
new file mode 100644
index 0000000..b1bda9b
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * A ProducerAck command is sent by a broker to a producer to let it know it has
+ * received and processed messages that it has produced. The producer will be
+ * flow controlled if it does not receive ProducerAck commands back from the
+ * broker.
+ *
+ * @openwire:marshaller code="19" version="3"
+ */
+public class ProducerAck extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ACK;
+
+    protected ProducerId producerId;
+    protected int size;
+
+    public ProducerAck() {
+    }
+
+    public ProducerAck(ProducerId producerId, int size) {
+        this.producerId = producerId;
+        this.size = size;
+    }
+
+    public void copy(ProducerAck copy) {
+        super.copy(copy);
+        copy.producerId = producerId;
+        copy.size = size;
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processProducerAck(this);
+    }
+
+    /**
+     * The producer id that this ack message is destined for.
+     *
+     * @openwire:property version=3
+     */
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    /**
+     * The number of bytes that are being acked.
+     *
+     * @openwire:property version=3
+     */
+    public int getSize() {
+        return size;
+    }
+
+    public void setSize(int size) {
+        this.size = size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerId.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerId.java
new file mode 100644
index 0000000..bd9283e
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerId.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="123"
+ *
+ */
+public class ProducerId implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ID;
+
+    protected String connectionId;
+    protected long sessionId;
+    protected long value;
+
+    protected transient int hashCode;
+    protected transient String key;
+    protected transient SessionId parentId;
+
+    public ProducerId() {
+    }
+
+    public ProducerId(SessionId sessionId, long producerId) {
+        this.connectionId = sessionId.getConnectionId();
+        this.sessionId = sessionId.getValue();
+        this.value = producerId;
+    }
+
+    public ProducerId(ProducerId id) {
+        this.connectionId = id.getConnectionId();
+        this.sessionId = id.getSessionId();
+        this.value = id.getValue();
+    }
+
+    public ProducerId(String producerKey) {
+        // Parse off the producerId
+        int p = producerKey.lastIndexOf(":");
+        if (p >= 0) {
+            value = Long.parseLong(producerKey.substring(p + 1));
+            producerKey = producerKey.substring(0, p);
+        }
+        setProducerSessionKey(producerKey);
+    }
+
+    public SessionId getParentId() {
+        if (parentId == null) {
+            parentId = new SessionId(this);
+        }
+        return parentId;
+    }
+
+    @Override
+    public int hashCode() {
+        if (hashCode == 0) {
+            hashCode = connectionId.hashCode() ^ (int)sessionId ^ (int)value;
+        }
+        return hashCode;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != ProducerId.class) {
+            return false;
+        }
+        ProducerId id = (ProducerId)o;
+        return sessionId == id.sessionId && value == id.value && connectionId.equals(id.connectionId);
+    }
+
+    /**
+     * @param sessionKey
+     */
+    private void setProducerSessionKey(String sessionKey) {
+        // Parse off the value
+        int p = sessionKey.lastIndexOf(":");
+        if (p >= 0) {
+            sessionId = Long.parseLong(sessionKey.substring(p + 1));
+            sessionKey = sessionKey.substring(0, p);
+        }
+        // The rest is the value
+        connectionId = sessionKey;
+    }
+
+    @Override
+    public String toString() {
+        if (key == null) {
+            key = connectionId + ":" + sessionId + ":" + value;
+        }
+        return key;
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public String getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(String connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getValue() {
+        return value;
+    }
+
+    public void setValue(long producerId) {
+        this.value = producerId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getSessionId() {
+        return sessionId;
+    }
+
+    public void setSessionId(long sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    @Override
+    public boolean isMarshallAware() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerInfo.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerInfo.java
new file mode 100644
index 0000000..23758b0
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerInfo.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+
+/**
+ * @openwire:marshaller code="6"
+ */
+public class ProducerInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_INFO;
+
+    protected ProducerId producerId;
+    protected OpenWireDestination destination;
+    protected BrokerId[] brokerPath;
+    protected boolean dispatchAsync;
+    protected int windowSize;
+
+    public ProducerInfo() {
+    }
+
+    public ProducerInfo(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    public ProducerInfo(SessionInfo sessionInfo, long producerId) {
+        this.producerId = new ProducerId(sessionInfo.getSessionId(), producerId);
+    }
+
+    public ProducerInfo copy() {
+        ProducerInfo info = new ProducerInfo();
+        copy(info);
+        return info;
+    }
+
+    public void copy(ProducerInfo info) {
+        super.copy(info);
+        info.producerId = producerId;
+        info.destination = destination;
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public OpenWireDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(OpenWireDestination destination) {
+        this.destination = destination;
+    }
+
+    public RemoveInfo createRemoveCommand() {
+        RemoveInfo command = new RemoveInfo(getProducerId());
+        command.setResponseRequired(isResponseRequired());
+        return command;
+    }
+
+    /**
+     * The route of brokers the command has moved through.
+     *
+     * @openwire:property version=1 cache=true
+     */
+    public BrokerId[] getBrokerPath() {
+        return brokerPath;
+    }
+
+    public void setBrokerPath(BrokerId[] brokerPath) {
+        this.brokerPath = brokerPath;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processAddProducer(this);
+    }
+
+    /**
+     * If the broker should dispatch messages from this producer async. Since
+     * sync dispatch could potentally block the producer thread, this could be
+     * an important setting for the producer.
+     *
+     * @openwire:property version=2
+     */
+    public boolean isDispatchAsync() {
+        return dispatchAsync;
+    }
+
+    public void setDispatchAsync(boolean dispatchAsync) {
+        this.dispatchAsync = dispatchAsync;
+    }
+
+    /**
+     * Used to configure the producer window size. A producer will send up to
+     * the configured window size worth of payload data to the broker before
+     * waiting for an Ack that allows him to send more.
+     *
+     * @openwire:property version=3
+     */
+    public int getWindowSize() {
+        return windowSize;
+    }
+
+    public void setWindowSize(int windowSize) {
+        this.windowSize = windowSize;
+    }
+
+    @Override
+    public boolean isProducerInfo() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveInfo.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveInfo.java
new file mode 100644
index 0000000..9aa4b5e
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveInfo.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+import java.io.IOException;
+
+/**
+ * Removes a consumer, producer, session or connection.
+ *
+ * @openwire:marshaller code="12"
+ */
+public class RemoveInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_INFO;
+
+    protected DataStructure objectId;
+    protected long lastDeliveredSequenceId;
+
+    public RemoveInfo() {
+    }
+
+    public RemoveInfo(DataStructure objectId) {
+        this.objectId = objectId;
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public DataStructure getObjectId() {
+        return objectId;
+    }
+
+    public void setObjectId(DataStructure objectId) {
+        this.objectId = objectId;
+    }
+
+    /**
+     * @openwire:property version=5 cache=false
+     */
+    public long getLastDeliveredSequenceId() {
+        return lastDeliveredSequenceId;
+    }
+
+    public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
+        this.lastDeliveredSequenceId = lastDeliveredSequenceId;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        switch (objectId.getDataStructureType()) {
+        case ConnectionId.DATA_STRUCTURE_TYPE:
+            return visitor.processRemoveConnection((ConnectionId)objectId, lastDeliveredSequenceId);
+        case SessionId.DATA_STRUCTURE_TYPE:
+            return visitor.processRemoveSession((SessionId)objectId, lastDeliveredSequenceId);
+        case ConsumerId.DATA_STRUCTURE_TYPE:
+            return visitor.processRemoveConsumer((ConsumerId)objectId, lastDeliveredSequenceId);
+        case ProducerId.DATA_STRUCTURE_TYPE:
+            return visitor.processRemoveProducer((ProducerId)objectId);
+        default:
+            throw new IOException("Unknown remove command type: " + objectId.getDataStructureType());
+        }
+    }
+
+    /**
+     * Returns true if this event is for a removed connection
+     */
+    public boolean isConnectionRemove() {
+        return objectId.getDataStructureType() == ConnectionId.DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * Returns true if this event is for a removed session
+     */
+    public boolean isSessionRemove() {
+        return objectId.getDataStructureType() == SessionId.DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * Returns true if this event is for a removed consumer
+     */
+    public boolean isConsumerRemove() {
+        return objectId.getDataStructureType() == ConsumerId.DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * Returns true if this event is for a removed producer
+     */
+    public boolean isProducerRemove() {
+        return objectId.getDataStructureType() == ProducerId.DATA_STRUCTURE_TYPE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveSubscriptionInfo.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveSubscriptionInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveSubscriptionInfo.java
new file mode 100644
index 0000000..5e04db8
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveSubscriptionInfo.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="9"
+ */
+public class RemoveSubscriptionInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_SUBSCRIPTION_INFO;
+
+    protected ConnectionId connectionId;
+    protected String clientId;
+    protected String subscriptionName;
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(ConnectionId connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getSubcriptionName() {
+        return subscriptionName;
+    }
+
+    /**
+     */
+    public void setSubcriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    public String getSubscriptionName() {
+        return subscriptionName;
+    }
+
+    public void setSubscriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processRemoveSubscription(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ReplayCommand.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ReplayCommand.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ReplayCommand.java
new file mode 100644
index 0000000..9d1b4c8
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ReplayCommand.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * A general purpose replay command for some kind of producer where ranges of
+ * messages are asked to be replayed. This command is typically used over a
+ * non-reliable transport such as UDP or multicast but could also be used on
+ * TCP/IP if a socket has been re-established.
+ *
+ * @openwire:marshaller code="65"
+ */
+public class ReplayCommand extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REPLAY;
+
+    private String producerId;
+    private int firstAckNumber;
+    private int lastAckNumber;
+    private int firstNakNumber;
+    private int lastNakNumber;
+
+    public ReplayCommand() {
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public String getProducerId() {
+        return producerId;
+    }
+
+    /**
+     * Is used to uniquely identify the producer of the sequence
+     *
+     * @openwire:property version=1 cache=false
+     */
+    public void setProducerId(String producerId) {
+        this.producerId = producerId;
+    }
+
+    public int getFirstAckNumber() {
+        return firstAckNumber;
+    }
+
+    /**
+     * Is used to specify the first sequence number being acknowledged as delivered on the transport
+     * so that it can be removed from cache
+     *
+     * @openwire:property version=1
+     */
+    public void setFirstAckNumber(int firstSequenceNumber) {
+        this.firstAckNumber = firstSequenceNumber;
+    }
+
+    public int getLastAckNumber() {
+        return lastAckNumber;
+    }
+
+    /**
+     * Is used to specify the last sequence number being acknowledged as delivered on the transport
+     * so that it can be removed from cache
+     *
+     * @openwire:property version=1
+     */
+    public void setLastAckNumber(int lastSequenceNumber) {
+        this.lastAckNumber = lastSequenceNumber;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return null;
+    }
+
+    /**
+     * Is used to specify the first sequence number to be replayed
+     *
+     * @openwire:property version=1
+     */
+    public int getFirstNakNumber() {
+        return firstNakNumber;
+    }
+
+    public void setFirstNakNumber(int firstNakNumber) {
+        this.firstNakNumber = firstNakNumber;
+    }
+
+    /**
+     * Is used to specify the last sequence number to be replayed
+     *
+     * @openwire:property version=1
+     */
+    public int getLastNakNumber() {
+        return lastNakNumber;
+    }
+
+    public void setLastNakNumber(int lastNakNumber) {
+        this.lastNakNumber = lastNakNumber;
+    }
+
+    @Override
+    public String toString() {
+        return "ReplayCommand {commandId = " + getCommandId() + ", firstNakNumber = " + getFirstNakNumber() + ", lastNakNumber = " + getLastNakNumber() + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Response.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Response.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Response.java
new file mode 100644
index 0000000..15e8508
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Response.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="30"
+ */
+public class Response extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.RESPONSE;
+    int correlationId;
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getCorrelationId() {
+        return correlationId;
+    }
+
+    public void setCorrelationId(int responseId) {
+        this.correlationId = responseId;
+    }
+
+    @Override
+    public boolean isResponse() {
+        return true;
+    }
+
+    public boolean isException() {
+        return false;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionId.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionId.java
new file mode 100644
index 0000000..d30b6b2
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionId.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="121"
+ */
+public class SessionId implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_ID;
+
+    protected String connectionId;
+    protected long value;
+
+    protected transient int hashCode;
+    protected transient String key;
+    protected transient ConnectionId parentId;
+
+    public SessionId() {
+    }
+
+    public SessionId(ConnectionId connectionId, long sessionId) {
+        this.connectionId = connectionId.getValue();
+        this.value = sessionId;
+    }
+
+    public SessionId(SessionId id) {
+        this.connectionId = id.getConnectionId();
+        this.value = id.getValue();
+    }
+
+    public SessionId(ProducerId id) {
+        this.connectionId = id.getConnectionId();
+        this.value = id.getSessionId();
+    }
+
+    public SessionId(ConsumerId id) {
+        this.connectionId = id.getConnectionId();
+        this.value = id.getSessionId();
+    }
+
+    public ConnectionId getParentId() {
+        if (parentId == null) {
+            parentId = new ConnectionId(this);
+        }
+        return parentId;
+    }
+
+    @Override
+    public int hashCode() {
+        if (hashCode == 0) {
+            hashCode = connectionId.hashCode() ^ (int)value;
+        }
+        return hashCode;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != SessionId.class) {
+            return false;
+        }
+        SessionId id = (SessionId)o;
+        return value == id.value && connectionId.equals(id.connectionId);
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public String getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(String connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getValue() {
+        return value;
+    }
+
+    public void setValue(long sessionId) {
+        this.value = sessionId;
+    }
+
+    @Override
+    public String toString() {
+        if (key == null) {
+            key = connectionId + ":" + value;
+        }
+        return key;
+    }
+
+    @Override
+    public boolean isMarshallAware() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionInfo.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionInfo.java
new file mode 100644
index 0000000..4064c3f
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionInfo.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="4"
+ */
+public class SessionInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_INFO;
+
+    protected SessionId sessionId;
+
+    public SessionInfo() {
+        sessionId = new SessionId();
+    }
+
+    public SessionInfo(ConnectionInfo connectionInfo, long sessionId) {
+        this.sessionId = new SessionId(connectionInfo.getConnectionId(), sessionId);
+    }
+
+    public SessionInfo(SessionId sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public SessionId getSessionId() {
+        return sessionId;
+    }
+
+    public void setSessionId(SessionId sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    public RemoveInfo createRemoveCommand() {
+        RemoveInfo command = new RemoveInfo(getSessionId());
+        command.setResponseRequired(isResponseRequired());
+        return command;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processAddSession(this);
+    }
+
+    @Override
+    public boolean isSessionInfo() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ShutdownInfo.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ShutdownInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ShutdownInfo.java
new file mode 100644
index 0000000..72f67e0
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ShutdownInfo.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="11"
+ */
+public class ShutdownInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SHUTDOWN_INFO;
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processShutdown(this);
+    }
+
+    @Override
+    public boolean isShutdownInfo() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SubscriptionInfo.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SubscriptionInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SubscriptionInfo.java
new file mode 100644
index 0000000..41ffcf6
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SubscriptionInfo.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * Used to represent a durable subscription.
+ *
+ * @openwire:marshaller code="55"
+ */
+public class SubscriptionInfo implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.DURABLE_SUBSCRIPTION_INFO;
+
+    protected OpenWireDestination subscribedDestination;
+    protected OpenWireDestination destination;
+    protected String clientId;
+    protected String subscriptionName;
+    protected String selector;
+
+    public SubscriptionInfo() {
+    }
+
+    public SubscriptionInfo(String clientId, String subscriptionName) {
+        this.clientId = clientId;
+        this.subscriptionName = subscriptionName;
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    /**
+     * This is the a resolved destination that the subscription is receiving messages from. This
+     * will never be a pattern or a composite destination.
+     *
+     * @openwire:property version=1 cache=true
+     */
+    public OpenWireDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(OpenWireDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getSelector() {
+        return selector;
+    }
+
+    public void setSelector(String selector) {
+        this.selector = selector;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getSubcriptionName() {
+        return subscriptionName;
+    }
+
+    /**
+     * @param subscriptionName
+     *        *
+     */
+    public void setSubcriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    public String getSubscriptionName() {
+        return subscriptionName;
+    }
+
+    public void setSubscriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    @Override
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + " { " + destination + " }";
+    }
+
+    @Override
+    public int hashCode() {
+        int h1 = clientId != null ? clientId.hashCode() : -1;
+        int h2 = subscriptionName != null ? subscriptionName.hashCode() : -1;
+        return h1 ^ h2;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        boolean result = false;
+        if (obj instanceof SubscriptionInfo) {
+            SubscriptionInfo other = (SubscriptionInfo) obj;
+            result = (clientId == null && other.clientId == null ||
+                      clientId != null && other.clientId != null && clientId.equals(other.clientId)) &&
+                     (subscriptionName == null && other.subscriptionName == null ||
+                      subscriptionName != null && other.subscriptionName != null && subscriptionName.equals(other.subscriptionName));
+        }
+        return result;
+    }
+
+    /**
+     * The destination the client originally subscribed to.. This may not match the {@see
+     * getDestination} method if the subscribed destination uses patterns or composites.
+     *
+     * If the subscribed destinations not set, this just returns the destination.
+     *
+     * @openwire:property version=3
+     */
+    public OpenWireDestination getSubscribedDestination() {
+        if (subscribedDestination == null) {
+            return getDestination();
+        }
+        return subscribedDestination;
+    }
+
+    public void setSubscribedDestination(OpenWireDestination subscribedDestination) {
+        this.subscribedDestination = subscribedDestination;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionId.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionId.java
new file mode 100644
index 0000000..71be56c
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionId.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller
+ */
+public abstract class TransactionId implements DataStructure {
+
+    public abstract boolean isXATransaction();
+    public abstract boolean isLocalTransaction();
+    public abstract String getTransactionKey();
+
+    @Override
+    public boolean isMarshallAware() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionInfo.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionInfo.java
new file mode 100644
index 0000000..698d5e2
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionInfo.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+import java.io.IOException;
+
+/**
+ * @openwire:marshaller code="7"
+ */
+public class TransactionInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.TRANSACTION_INFO;
+
+    public static final byte BEGIN = 0;
+    public static final byte PREPARE = 1;
+    public static final byte COMMIT_ONE_PHASE = 2;
+    public static final byte COMMIT_TWO_PHASE = 3;
+    public static final byte ROLLBACK = 4;
+    public static final byte RECOVER = 5;
+    public static final byte FORGET = 6;
+    public static final byte END = 7;
+
+    protected byte type;
+    protected ConnectionId connectionId;
+    protected TransactionId transactionId;
+
+    public TransactionInfo() {
+    }
+
+    public TransactionInfo(ConnectionId connectionId, TransactionId transactionId, byte type) {
+        this.connectionId = connectionId;
+        this.transactionId = transactionId;
+        this.type = type;
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(ConnectionId connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public TransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(TransactionId transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public byte getType() {
+        return type;
+    }
+
+    public void setType(byte type) {
+        this.type = type;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        switch (type) {
+        case TransactionInfo.BEGIN:
+            return visitor.processBeginTransaction(this);
+        case TransactionInfo.END:
+            return visitor.processEndTransaction(this);
+        case TransactionInfo.PREPARE:
+            return visitor.processPrepareTransaction(this);
+        case TransactionInfo.COMMIT_ONE_PHASE:
+            return visitor.processCommitTransactionOnePhase(this);
+        case TransactionInfo.COMMIT_TWO_PHASE:
+            return visitor.processCommitTransactionTwoPhase(this);
+        case TransactionInfo.ROLLBACK:
+            return visitor.processRollbackTransaction(this);
+        case TransactionInfo.RECOVER:
+            return visitor.processRecoverTransactions(this);
+        case TransactionInfo.FORGET:
+            return visitor.processForgetTransaction(this);
+        default:
+            throw new IOException("Transaction info type unknown: " + type);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java
new file mode 100644
index 0000000..f6ceca0
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.openwire.codec.OpenWireFormat;
+import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+import org.fusesource.hawtbuf.UTF8Buffer;
+
+/**
+ * @openwire:marshaller code="1"
+ */
+public class WireFormatInfo implements Command, MarshallAware {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.WIREFORMAT_INFO;
+    private static final int MAX_PROPERTY_SIZE = 1024 * 4;
+    private static final byte MAGIC[] = new byte[] { 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q' };
+
+    protected byte magic[] = MAGIC;
+    protected int version;
+    protected Buffer marshalledProperties;
+
+    protected transient Map<String, Object> properties;
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    @Override
+    public boolean isWireFormatInfo() {
+        return true;
+    }
+
+    @Override
+    public boolean isMarshallAware() {
+        return true;
+    }
+
+    /**
+     * @openwire:property version=1 size=8 testSize=-1
+     */
+    public byte[] getMagic() {
+        return magic;
+    }
+
+    public void setMagic(byte[] magic) {
+        this.magic = magic;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public Buffer getMarshalledProperties() {
+        return marshalledProperties;
+    }
+
+    public void setMarshalledProperties(Buffer marshalledProperties) {
+        this.marshalledProperties = marshalledProperties;
+    }
+
+    // ////////////////////
+    // Implementation Methods.
+    // ////////////////////
+
+    public Object getProperty(String name) throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                return null;
+            }
+            properties = unmarsallProperties(marshalledProperties);
+        }
+        return properties.get(name);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<String, Object> getProperties() throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                return Collections.EMPTY_MAP;
+            }
+            properties = unmarsallProperties(marshalledProperties);
+        }
+        return Collections.unmodifiableMap(properties);
+    }
+
+    public void clearProperties() {
+        marshalledProperties = null;
+        properties = null;
+    }
+
+    public void setProperty(String name, Object value) throws IOException {
+        lazyCreateProperties();
+        properties.put(name, value);
+    }
+
+    protected void lazyCreateProperties() throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                properties = new HashMap<String, Object>();
+            } else {
+                properties = unmarsallProperties(marshalledProperties);
+                marshalledProperties = null;
+            }
+        }
+    }
+
+    private Map<String, Object> unmarsallProperties(Buffer marshalledProperties) throws IOException {
+        return OpenWireMarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)), MAX_PROPERTY_SIZE);
+    }
+
+    @Override
+    public void beforeMarshall(OpenWireFormat wireFormat) throws IOException {
+        // Need to marshal the properties.
+        if (marshalledProperties == null && properties != null) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputStream os = new DataOutputStream(baos);
+            OpenWireMarshallingSupport.marshalPrimitiveMap(properties, os);
+            os.close();
+            marshalledProperties = baos.toBuffer();
+        }
+    }
+
+    @Override
+    public void afterMarshall(OpenWireFormat wireFormat) throws IOException {
+    }
+
+    @Override
+    public void beforeUnmarshall(OpenWireFormat wireFormat) throws IOException {
+    }
+
+    @Override
+    public void afterUnmarshall(OpenWireFormat wireFormat) throws IOException {
+    }
+
+    public boolean isValid() {
+        return magic != null && Arrays.equals(magic, MAGIC);
+    }
+
+    @Override
+    public void setResponseRequired(boolean responseRequired) {
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isCacheEnabled() throws IOException {
+        return Boolean.TRUE == getProperty("CacheEnabled");
+    }
+
+    public void setCacheEnabled(boolean cacheEnabled) throws IOException {
+        setProperty("CacheEnabled", cacheEnabled ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isStackTraceEnabled() throws IOException {
+        return Boolean.TRUE == getProperty("StackTraceEnabled");
+    }
+
+    public void setStackTraceEnabled(boolean stackTraceEnabled) throws IOException {
+        setProperty("StackTraceEnabled", stackTraceEnabled ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isTcpNoDelayEnabled() throws IOException {
+        return Boolean.TRUE == getProperty("TcpNoDelayEnabled");
+    }
+
+    public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) throws IOException {
+        setProperty("TcpNoDelayEnabled", tcpNoDelayEnabled ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isSizePrefixDisabled() throws IOException {
+        return Boolean.TRUE == getProperty("SizePrefixDisabled");
+    }
+
+    public void setSizePrefixDisabled(boolean prefixPacketSize) throws IOException {
+        setProperty("SizePrefixDisabled", prefixPacketSize ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public boolean isTightEncodingEnabled() throws IOException {
+        return Boolean.TRUE == getProperty("TightEncodingEnabled");
+    }
+
+    public void setTightEncodingEnabled(boolean tightEncodingEnabled) throws IOException {
+        setProperty("TightEncodingEnabled", tightEncodingEnabled ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    public String getHost() throws IOException {
+        UTF8Buffer buff = (UTF8Buffer) getProperty("Host");
+        if (buff == null) {
+            return null;
+        }
+        return buff.toString();
+    }
+
+    public void setHost(String hostname) throws IOException {
+        setProperty("Host", hostname);
+    }
+
+    /**
+     * @throws IOException
+     */
+    public long getMaxInactivityDuration() throws IOException {
+        Long l = (Long) getProperty("MaxInactivityDuration");
+        return l == null ? 0 : l.longValue();
+    }
+
+    public void setMaxInactivityDuration(long maxInactivityDuration) throws IOException {
+        setProperty("MaxInactivityDuration", new Long(maxInactivityDuration));
+    }
+
+    public long getMaxInactivityDurationInitalDelay() throws IOException {
+        Long l = (Long) getProperty("MaxInactivityDurationInitalDelay");
+        return l == null ? 0 : l.longValue();
+    }
+
+    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) throws IOException {
+        setProperty("MaxInactivityDurationInitalDelay", new Long(maxInactivityDurationInitalDelay));
+    }
+
+    public long getMaxFrameSize() throws IOException {
+        Long l = (Long) getProperty("MaxFrameSize");
+        return l == null ? 0 : l.longValue();
+    }
+
+    public void setMaxFrameSize(long maxFrameSize) throws IOException {
+        setProperty("MaxFrameSize", new Long(maxFrameSize));
+    }
+
+    /**
+     * @throws IOException
+     */
+    public int getCacheSize() throws IOException {
+        Integer i = (Integer) getProperty("CacheSize");
+        return i == null ? 0 : i.intValue();
+    }
+
+    public void setCacheSize(int cacheSize) throws IOException {
+        setProperty("CacheSize", new Integer(cacheSize));
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processWireFormat(this);
+    }
+
+    @Override
+    public String toString() {
+        Map<String, Object> p = null;
+        try {
+            p = getProperties();
+        } catch (IOException ignore) {
+        }
+        return "WireFormatInfo { version=" + version + ", properties=" + p + ", magic=" + toString(magic) + "}";
+    }
+
+    private String toString(byte[] data) {
+        StringBuffer sb = new StringBuffer();
+        sb.append('[');
+        for (int i = 0; i < data.length; i++) {
+            if (i != 0) {
+                sb.append(',');
+            }
+            sb.append((char) data[i]);
+        }
+        sb.append(']');
+        return sb.toString();
+    }
+
+    // /////////////////////////////////////////////////////////////
+    //
+    // This are not implemented.
+    //
+    // /////////////////////////////////////////////////////////////
+
+    @Override
+    public void setCommandId(int value) {
+    }
+
+    @Override
+    public int getCommandId() {
+        return 0;
+    }
+
+    @Override
+    public boolean isResponseRequired() {
+        return false;
+    }
+
+    @Override
+    public boolean isResponse() {
+        return false;
+    }
+
+    @Override
+    public boolean isBrokerInfo() {
+        return false;
+    }
+
+    @Override
+    public boolean isMessageDispatch() {
+        return false;
+    }
+
+    @Override
+    public boolean isMessage() {
+        return false;
+    }
+
+    @Override
+    public boolean isMessageAck() {
+        return false;
+    }
+
+    @Override
+    public boolean isMessageDispatchNotification() {
+        return false;
+    }
+
+    @Override
+    public boolean isShutdownInfo() {
+        return false;
+    }
+
+    @Override
+    public boolean isConnectionControl() {
+        return false;
+    }
+
+    @Override
+    public boolean isConnectionInfo() {
+        return false;
+    }
+
+    @Override
+    public boolean isSessionInfo() {
+        return false;
+    }
+
+    @Override
+    public boolean isProducerInfo() {
+        return false;
+    }
+
+    @Override
+    public boolean isConsumerInfo() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java
new file mode 100644
index 0000000..8781386
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import javax.transaction.xa.Xid;
+
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+
+/**
+ * @openwire:marshaller code="112"
+ */
+public class XATransactionId extends TransactionId implements Xid, Comparable<XATransactionId> {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_XA_TRANSACTION_ID;
+
+    private int formatId;
+    private byte[] branchQualifier;
+    private byte[] globalTransactionId;
+    private transient DataByteArrayOutputStream outputStream;
+    private transient byte[] encodedXidBytes;
+
+    private transient int hash;
+    private transient String transactionKey;
+
+    public XATransactionId() {
+    }
+
+    public XATransactionId(Xid xid) {
+        this.formatId = xid.getFormatId();
+        this.globalTransactionId = xid.getGlobalTransactionId();
+        this.branchQualifier = xid.getBranchQualifier();
+    }
+
+    public XATransactionId(byte[] encodedBytes) throws IOException {
+        encodedXidBytes = encodedBytes;
+        initFromEncodedBytes();
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    final int XID_PREFIX_SIZE = 16;
+    //+|-,(long)lastAck,(byte)priority,(int)formatid,(short)globalLength....
+    private void initFromEncodedBytes() throws IOException {
+        DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXidBytes);
+        inputStream.skipBytes(10);
+        formatId = inputStream.readInt();
+        int globalLength = inputStream.readShort();
+        globalTransactionId = new byte[globalLength];
+        try {
+            inputStream.read(globalTransactionId);
+            branchQualifier = new byte[inputStream.available()];
+            inputStream.read(branchQualifier);
+        } catch (IOException fatal) {
+            throw new RuntimeException(this + ", failed to decode:", fatal);
+        } finally {
+            inputStream.close();
+        }
+    }
+
+    public synchronized byte[] getEncodedXidBytes() {
+        if (encodedXidBytes == null) {
+            outputStream = new DataByteArrayOutputStream(XID_PREFIX_SIZE + globalTransactionId.length + branchQualifier.length);
+            try {
+                outputStream.position(10);
+                outputStream.writeInt(formatId);
+                outputStream.writeShort(globalTransactionId.length);
+            } catch (IOException fatal) {
+                throw new RuntimeException(this + ", failed to encode:", fatal);
+            }
+            try {
+                outputStream.write(globalTransactionId);
+                outputStream.write(branchQualifier);
+            } catch (IOException fatal) {
+                throw new RuntimeException(this + ", failed to encode:", fatal);
+            }
+            encodedXidBytes = outputStream.getData();
+        }
+        return encodedXidBytes;
+    }
+
+    public DataByteArrayOutputStream internalOutputStream() {
+        return outputStream;
+    }
+
+    @Override
+    public synchronized String getTransactionKey() {
+        if (transactionKey == null) {
+            StringBuffer s = new StringBuffer();
+            s.append("XID:[" + formatId + ",globalId=");
+            s.append(stringForm(formatId, globalTransactionId));
+            s.append(",branchId=");
+            s.append(stringForm(formatId, branchQualifier));
+            s.append("]");
+            transactionKey = s.toString();
+        }
+        return transactionKey;
+    }
+
+    private String stringForm(int format, byte[] uid) {
+        StringBuffer s = new StringBuffer();
+        switch (format) {
+            case 131077:  // arjuna
+                stringFormArj(s, uid);
+                break;
+            default: // aries
+                stringFormDefault(s, uid);
+        }
+        return s.toString();
+    }
+
+    private void stringFormDefault(StringBuffer s, byte[] uid) {
+        for (int i = 0; i < uid.length; i++) {
+            s.append(Integer.toHexString(uid[i]));
+        }
+    }
+
+    private void stringFormArj(StringBuffer s, byte[] uid) {
+        DataByteArrayInputStream byteArrayInputStream = null;
+        try {
+            byteArrayInputStream = new DataByteArrayInputStream(uid);
+            s.append(Long.toString(byteArrayInputStream.readLong(), 16));
+            s.append(':');
+            s.append(Long.toString(byteArrayInputStream.readLong(), 16));
+            s.append(':');
+
+            s.append(Integer.toString(byteArrayInputStream.readInt(), 16));
+            s.append(':');
+            s.append(Integer.toString(byteArrayInputStream.readInt(), 16));
+            s.append(':');
+            s.append(Integer.toString(byteArrayInputStream.readInt(), 16));
+        } catch (Exception ignored) {
+            stringFormDefault(s, uid);
+        } finally {
+            try {
+                byteArrayInputStream.close();
+            } catch (IOException e) {
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getTransactionKey();
+    }
+
+    @Override
+    public boolean isXATransaction() {
+        return true;
+    }
+
+    @Override
+    public boolean isLocalTransaction() {
+        return false;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    @Override
+    public int getFormatId() {
+        return formatId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    @Override
+    public byte[] getGlobalTransactionId() {
+        return globalTransactionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    @Override
+    public byte[] getBranchQualifier() {
+        return branchQualifier;
+    }
+
+    public void setBranchQualifier(byte[] branchQualifier) {
+        this.branchQualifier = branchQualifier;
+        this.hash = 0;
+    }
+
+    public void setFormatId(int formatId) {
+        this.formatId = formatId;
+        this.hash = 0;
+    }
+
+    public void setGlobalTransactionId(byte[] globalTransactionId) {
+        this.globalTransactionId = globalTransactionId;
+        this.hash = 0;
+    }
+
+    @Override
+    public int hashCode() {
+        if (hash == 0) {
+            hash = formatId;
+            hash = hash(globalTransactionId, hash);
+            hash = hash(branchQualifier, hash);
+            if (hash == 0) {
+                hash = 0xaceace;
+            }
+        }
+        return hash;
+    }
+
+    private static int hash(byte[] bytes, int hash) {
+        int size = bytes.length;
+        for (int i = 0; i < size; i++) {
+            hash ^= bytes[i] << ((i % 4) * 8);
+        }
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || o.getClass() != XATransactionId.class) {
+            return false;
+        }
+
+        XATransactionId xid = (XATransactionId)o;
+        return xid.formatId == formatId &&
+               Arrays.equals(xid.globalTransactionId, globalTransactionId) &&
+               Arrays.equals(xid.branchQualifier, branchQualifier);
+    }
+
+    @Override
+    public int compareTo(XATransactionId xid) {
+        if (xid == null) {
+            return -1;
+        }
+
+        return getTransactionKey().compareTo(xid.getTransactionKey());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/package.html
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/package.html b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/package.html
new file mode 100644
index 0000000..236b95c
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/package.html
@@ -0,0 +1,24 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+  <head></head>
+  <body>
+    <p>
+    Represents the Object Model for the OpenWire protocol.
+    </p>
+  </body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/utils/CronParser.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/CronParser.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/CronParser.java
new file mode 100644
index 0000000..491f26d
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/CronParser.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.utils;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import javax.jms.MessageFormatException;
+
+public class CronParser {
+
+    private static final int NUMBER_TOKENS = 5;
+    private static final int MINUTES = 0;
+    private static final int HOURS = 1;
+    private static final int DAY_OF_MONTH = 2;
+    private static final int MONTH = 3;
+    private static final int DAY_OF_WEEK = 4;
+
+    public static long getNextScheduledTime(final String cronEntry, long currentTime) throws MessageFormatException {
+
+        long result = 0;
+
+        if (cronEntry == null || cronEntry.length() == 0) {
+            return result;
+        }
+
+        // Handle the once per minute case "* * * * *"
+        // starting the next event at the top of the minute.
+        if (cronEntry.equals("* * * * *")) {
+            result = currentTime + 60 * 1000;
+            result = result / 60000 * 60000;
+            return result;
+        }
+
+        List<String> list = tokenize(cronEntry);
+        List<CronEntry> entries = buildCronEntries(list);
+        Calendar working = Calendar.getInstance();
+        working.setTimeInMillis(currentTime);
+        working.set(Calendar.SECOND, 0);
+
+        CronEntry minutes = entries.get(MINUTES);
+        CronEntry hours = entries.get(HOURS);
+        CronEntry dayOfMonth = entries.get(DAY_OF_MONTH);
+        CronEntry month = entries.get(MONTH);
+        CronEntry dayOfWeek = entries.get(DAY_OF_WEEK);
+
+        // Start at the top of the next minute, cron is only guaranteed to be
+        // run on the minute.
+        int timeToNextMinute = 60 - working.get(Calendar.SECOND);
+        working.add(Calendar.SECOND, timeToNextMinute);
+
+        // If its already to late in the day this will roll us over to tomorrow
+        // so we'll need to check again when done updating month and day.
+        int currentMinutes = working.get(Calendar.MINUTE);
+        if (!isCurrent(minutes, currentMinutes)) {
+            int nextMinutes = getNext(minutes, currentMinutes);
+            working.add(Calendar.MINUTE, nextMinutes);
+        }
+
+        int currentHours = working.get(Calendar.HOUR_OF_DAY);
+        if (!isCurrent(hours, currentHours)) {
+            int nextHour = getNext(hours, currentHours);
+            working.add(Calendar.HOUR_OF_DAY, nextHour);
+        }
+
+        // We can roll into the next month here which might violate the cron setting
+        // rules so we check once then recheck again after applying the month settings.
+        doUpdateCurrentDay(working, dayOfMonth, dayOfWeek);
+
+        // Start by checking if we are in the right month, if not then calculations
+        // need to start from the beginning of the month to ensure that we don't end
+        // up on the wrong day. (Can happen when DAY_OF_WEEK is set and current time
+        // is ahead of the day of the week to execute on).
+        doUpdateCurrentMonth(working, month);
+
+        // Now Check day of week and day of month together since they can be specified
+        // together in one entry, if both "day of month" and "day of week" are restricted
+        // (not "*"), then either the "day of month" field (3) or the "day of week" field
+        // (5) must match the current day or the Calenday must be advanced.
+        doUpdateCurrentDay(working, dayOfMonth, dayOfWeek);
+
+        // Now we can chose the correct hour and minute of the day in question.
+
+        currentHours = working.get(Calendar.HOUR_OF_DAY);
+        if (!isCurrent(hours, currentHours)) {
+            int nextHour = getNext(hours, currentHours);
+            working.add(Calendar.HOUR_OF_DAY, nextHour);
+        }
+
+        currentMinutes = working.get(Calendar.MINUTE);
+        if (!isCurrent(minutes, currentMinutes)) {
+            int nextMinutes = getNext(minutes, currentMinutes);
+            working.add(Calendar.MINUTE, nextMinutes);
+        }
+
+        result = working.getTimeInMillis();
+
+        if (result <= currentTime) {
+            throw new ArithmeticException("Unable to compute next scheduled exection time.");
+        }
+
+        return result;
+    }
+
+    protected static long doUpdateCurrentMonth(Calendar working, CronEntry month) throws MessageFormatException {
+        int currentMonth = working.get(Calendar.MONTH) + 1;
+        if (!isCurrent(month, currentMonth)) {
+            int nextMonth = getNext(month, currentMonth);
+            working.add(Calendar.MONTH, nextMonth);
+
+            // Reset to start of month.
+            resetToStartOfDay(working, 1);
+
+            return working.getTimeInMillis();
+        }
+
+        return 0L;
+    }
+
+    protected static long doUpdateCurrentDay(Calendar working, CronEntry dayOfMonth, CronEntry dayOfWeek) throws MessageFormatException {
+
+        int currentDayOfWeek = working.get(Calendar.DAY_OF_WEEK) - 1;
+        int currentDayOfMonth = working.get(Calendar.DAY_OF_MONTH);
+
+        // Simplest case, both are unrestricted or both match today otherwise
+        // result must be the closer of the two if both are set, or the next
+        // match to the one that is.
+        if (!isCurrent(dayOfWeek, currentDayOfWeek) || !isCurrent(dayOfMonth, currentDayOfMonth)) {
+
+            int nextWeekDay = Integer.MAX_VALUE;
+            int nextCalendarDay = Integer.MAX_VALUE;
+
+            if (!isCurrent(dayOfWeek, currentDayOfWeek)) {
+                nextWeekDay = getNext(dayOfWeek, currentDayOfWeek);
+            }
+
+            if (!isCurrent(dayOfMonth, currentDayOfMonth)) {
+                nextCalendarDay = getNext(dayOfMonth, currentDayOfMonth);
+            }
+
+            if (nextWeekDay < nextCalendarDay) {
+                working.add(Calendar.DAY_OF_WEEK, nextWeekDay);
+            } else {
+                working.add(Calendar.DAY_OF_MONTH, nextCalendarDay);
+            }
+
+            // Since the day changed, we restart the clock at the start of the day
+            // so that the next time will either be at 12am + value of hours and
+            // minutes pattern.
+            resetToStartOfDay(working, working.get(Calendar.DAY_OF_MONTH));
+
+            return working.getTimeInMillis();
+        }
+
+        return 0L;
+    }
+
+    public static void validate(final String cronEntry) throws MessageFormatException {
+        List<String> list = tokenize(cronEntry);
+        List<CronEntry> entries = buildCronEntries(list);
+        for (CronEntry e : entries) {
+            validate(e);
+        }
+    }
+
+    static void validate(final CronEntry entry) throws MessageFormatException {
+        List<Integer> list = entry.currentWhen;
+        if (list.isEmpty() || list.get(0).intValue() < entry.start || list.get(list.size() - 1).intValue() > entry.end) {
+            throw new MessageFormatException("Invalid token: " + entry);
+        }
+    }
+
+    static int getNext(final CronEntry entry, final int current) throws MessageFormatException {
+        int result = 0;
+
+        if (entry.currentWhen == null) {
+            entry.currentWhen = calculateValues(entry);
+        }
+
+        List<Integer> list = entry.currentWhen;
+        int next = -1;
+        for (Integer i : list) {
+            if (i.intValue() > current) {
+                next = i.intValue();
+                break;
+            }
+        }
+        if (next != -1) {
+            result = next - current;
+        } else {
+            int first = list.get(0).intValue();
+            result = entry.end + first - entry.start - current;
+
+            // Account for difference of one vs zero based indices.
+            if (entry.name.equals("DayOfWeek") || entry.name.equals("Month")) {
+                result++;
+            }
+        }
+
+        return result;
+    }
+
+    static boolean isCurrent(final CronEntry entry, final int current) throws MessageFormatException {
+        boolean result = entry.currentWhen.contains(new Integer(current));
+        return result;
+    }
+
+    protected static void resetToStartOfDay(Calendar target, int day) {
+        target.set(Calendar.DAY_OF_MONTH, day);
+        target.set(Calendar.HOUR_OF_DAY, 0);
+        target.set(Calendar.MINUTE, 0);
+        target.set(Calendar.SECOND, 0);
+    }
+
+    static List<String> tokenize(String cron) throws IllegalArgumentException {
+        StringTokenizer tokenize = new StringTokenizer(cron);
+        List<String> result = new ArrayList<String>();
+        while (tokenize.hasMoreTokens()) {
+            result.add(tokenize.nextToken());
+        }
+        if (result.size() != NUMBER_TOKENS) {
+            throw new IllegalArgumentException("Not a valid cron entry - wrong number of tokens(" + result.size() + "): " + cron);
+        }
+        return result;
+    }
+
+    protected static List<Integer> calculateValues(final CronEntry entry) {
+        List<Integer> result = new ArrayList<Integer>();
+        if (isAll(entry.token)) {
+            for (int i = entry.start; i <= entry.end; i++) {
+                result.add(i);
+            }
+        } else if (isAStep(entry.token)) {
+            int denominator = getDenominator(entry.token);
+            String numerator = getNumerator(entry.token);
+            CronEntry ce = new CronEntry(entry.name, numerator, entry.start, entry.end);
+            List<Integer> list = calculateValues(ce);
+            for (Integer i : list) {
+                if (i.intValue() % denominator == 0) {
+                    result.add(i);
+                }
+            }
+        } else if (isAList(entry.token)) {
+            StringTokenizer tokenizer = new StringTokenizer(entry.token, ",");
+            while (tokenizer.hasMoreTokens()) {
+                String str = tokenizer.nextToken();
+                CronEntry ce = new CronEntry(entry.name, str, entry.start, entry.end);
+                List<Integer> list = calculateValues(ce);
+                result.addAll(list);
+            }
+        } else if (isARange(entry.token)) {
+            int index = entry.token.indexOf('-');
+            int first = Integer.parseInt(entry.token.substring(0, index));
+            int last = Integer.parseInt(entry.token.substring(index + 1));
+            for (int i = first; i <= last; i++) {
+                result.add(i);
+            }
+        } else {
+            int value = Integer.parseInt(entry.token);
+            result.add(value);
+        }
+        Collections.sort(result);
+        return result;
+    }
+
+    protected static boolean isARange(String token) {
+        return token != null && token.indexOf('-') >= 0;
+    }
+
+    protected static boolean isAStep(String token) {
+        return token != null && token.indexOf('/') >= 0;
+    }
+
+    protected static boolean isAList(String token) {
+        return token != null && token.indexOf(',') >= 0;
+    }
+
+    protected static boolean isAll(String token) {
+        return token != null && token.length() == 1 && (token.charAt(0) == '*' || token.charAt(0) == '?');
+    }
+
+    protected static int getDenominator(final String token) {
+        int result = 0;
+        int index = token.indexOf('/');
+        String str = token.substring(index + 1);
+        result = Integer.parseInt(str);
+        return result;
+    }
+
+    protected static String getNumerator(final String token) {
+        int index = token.indexOf('/');
+        String str = token.substring(0, index);
+        return str;
+    }
+
+    static List<CronEntry> buildCronEntries(List<String> tokens) {
+
+        List<CronEntry> result = new ArrayList<CronEntry>();
+
+        CronEntry minutes = new CronEntry("Minutes", tokens.get(MINUTES), 0, 60);
+        minutes.currentWhen = calculateValues(minutes);
+        result.add(minutes);
+        CronEntry hours = new CronEntry("Hours", tokens.get(HOURS), 0, 24);
+        hours.currentWhen = calculateValues(hours);
+        result.add(hours);
+        CronEntry dayOfMonth = new CronEntry("DayOfMonth", tokens.get(DAY_OF_MONTH), 1, 31);
+        dayOfMonth.currentWhen = calculateValues(dayOfMonth);
+        result.add(dayOfMonth);
+        CronEntry month = new CronEntry("Month", tokens.get(MONTH), 1, 12);
+        month.currentWhen = calculateValues(month);
+        result.add(month);
+        CronEntry dayOfWeek = new CronEntry("DayOfWeek", tokens.get(DAY_OF_WEEK), 0, 6);
+        dayOfWeek.currentWhen = calculateValues(dayOfWeek);
+        result.add(dayOfWeek);
+
+        return result;
+    }
+
+    static class CronEntry {
+
+        final String name;
+        final String token;
+        final int start;
+        final int end;
+
+        List<Integer> currentWhen;
+
+        CronEntry(String name, String token, int start, int end) {
+            this.name = name;
+            this.token = token;
+            this.start = start;
+            this.end = end;
+        }
+
+        @Override
+        public String toString() {
+            return this.name + ":" + token;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/utils/DefaultUnresolvedDestinationTransformer.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/DefaultUnresolvedDestinationTransformer.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/DefaultUnresolvedDestinationTransformer.java
new file mode 100644
index 0000000..e7c6bec
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/DefaultUnresolvedDestinationTransformer.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.utils;
+
+import java.lang.reflect.Method;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Topic;
+
+import org.apache.activemq.openwire.commands.OpenWireDestination;
+import org.apache.activemq.openwire.commands.OpenWireQueue;
+import org.apache.activemq.openwire.commands.OpenWireTopic;
+
+/**
+ * A default implementation of the resolver that attempts to find an isQueue or isTopic method
+ * on the foreign destination to determine the correct type.
+ */
+public class DefaultUnresolvedDestinationTransformer implements UnresolvedDestinationTransformer {
+
+    @Override
+    public OpenWireDestination transform(Destination dest) throws JMSException {
+        String queueName = ((Queue) dest).getQueueName();
+        String topicName = ((Topic) dest).getTopicName();
+
+        if (queueName == null && topicName == null) {
+            throw new JMSException("Unresolvable destination: Both queue and topic names are null: " + dest);
+        }
+
+        try {
+            Method isQueueMethod = dest.getClass().getMethod("isQueue");
+            Method isTopicMethod = dest.getClass().getMethod("isTopic");
+
+            if (isQueueMethod == null && isTopicMethod == null) {
+                throw new JMSException("Unresolvable destination: Neither isQueue nor isTopic methods present: " + dest);
+            }
+
+            Boolean isQueue = (Boolean) isQueueMethod.invoke(dest);
+            Boolean isTopic = (Boolean) isTopicMethod.invoke(dest);
+            if (isQueue) {
+                return new OpenWireQueue(queueName);
+            } else if (isTopic) {
+                return new OpenWireTopic(topicName);
+            } else {
+                throw new JMSException("Unresolvable destination: Neither Queue nor Topic: " + dest);
+            }
+        } catch (Exception e) {
+            throw new JMSException("Unresolvable destination: " + e.getMessage() + ": " + dest);
+        }
+    }
+
+    @Override
+    public OpenWireDestination transform(String dest) throws JMSException {
+        return new OpenWireQueue(dest);
+    }
+}


Mime
View raw message