activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [37/51] [partial] https://issues.apache.org/jira/browse/OPENWIRE-1
Date Thu, 24 Jul 2014 14:23:26 GMT
http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ExceptionResponse.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ExceptionResponse.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ExceptionResponse.java
new file mode 100644
index 0000000..07789ac
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ExceptionResponse.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="31"
+ */
+public class ExceptionResponse extends Response {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.EXCEPTION_RESPONSE;
+
+    Throwable exception;
+
+    public ExceptionResponse() {
+    }
+
+    public ExceptionResponse(Throwable e) {
+        setException(e);
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public Throwable getException() {
+        return exception;
+    }
+
+    public void setException(Throwable exception) {
+        this.exception = exception;
+    }
+
+    @Override
+    public boolean isException() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/FlushCommand.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/FlushCommand.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/FlushCommand.java
new file mode 100644
index 0000000..6625ff7
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/FlushCommand.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * An indication to the transport layer that a flush is required.
+ *
+ * @openwire:marshaller code="15"
+ */
+public class FlushCommand extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.FLUSH_COMMAND;
+    public static final Command COMMAND = new FlushCommand();
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processFlush(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/IntegerResponse.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/IntegerResponse.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/IntegerResponse.java
new file mode 100644
index 0000000..f33dc02
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/IntegerResponse.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="34"
+ */
+public class IntegerResponse extends Response {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.INTEGER_RESPONSE;
+
+    int result;
+
+    public IntegerResponse() {
+    }
+
+    public IntegerResponse(int result) {
+        this.result = result;
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getResult() {
+        return result;
+    }
+
+    public void setResult(int result) {
+        this.result = result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalQueueAck.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalQueueAck.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalQueueAck.java
new file mode 100644
index 0000000..4594461
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalQueueAck.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="52"
+ */
+public class JournalQueueAck implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.JOURNAL_REMOVE;
+
+    OpenWireDestination destination;
+    MessageAck messageAck;
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public OpenWireDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(OpenWireDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageAck getMessageAck() {
+        return messageAck;
+    }
+
+    public void setMessageAck(MessageAck messageAck) {
+        this.messageAck = messageAck;
+    }
+
+    @Override
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "{ " + destination + " }";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTopicAck.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTopicAck.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTopicAck.java
new file mode 100644
index 0000000..032dff0
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTopicAck.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="50"
+ */
+public class JournalTopicAck implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.JOURNAL_ACK;
+
+    protected OpenWireDestination destination;
+    protected String clientId;
+    protected String subscritionName;
+    protected MessageId messageId;
+    protected long messageSequenceId;
+    protected TransactionId transactionId;
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public OpenWireDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(OpenWireDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageId getMessageId() {
+        return messageId;
+    }
+
+    public void setMessageId(MessageId messageId) {
+        this.messageId = messageId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getMessageSequenceId() {
+        return messageSequenceId;
+    }
+
+    public void setMessageSequenceId(long messageSequenceId) {
+        this.messageSequenceId = messageSequenceId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getSubscritionName() {
+        return subscritionName;
+    }
+
+    public void setSubscritionName(String subscritionName) {
+        this.subscritionName = subscritionName;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public TransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(TransactionId transaction) {
+        this.transactionId = transaction;
+    }
+
+    @Override
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "{ " + destination + " }";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTrace.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTrace.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTrace.java
new file mode 100644
index 0000000..ebc4746
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTrace.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="53"
+ */
+public class JournalTrace implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.JOURNAL_TRACE;
+
+    private String message;
+
+    public JournalTrace() {
+    }
+
+    public JournalTrace(String message) {
+        this.message = message;
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getMessage() {
+        return message;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    @Override
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + " { " + message + " }";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTransaction.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTransaction.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTransaction.java
new file mode 100644
index 0000000..8df4ccf
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/JournalTransaction.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="54"
+ */
+public class JournalTransaction implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.JOURNAL_TRANSACTION;
+
+    public static final byte XA_PREPARE = 1;
+    public static final byte XA_COMMIT = 2;
+    public static final byte XA_ROLLBACK = 3;
+    public static final byte LOCAL_COMMIT = 4;
+    public static final byte LOCAL_ROLLBACK = 5;
+
+    public byte type;
+    public boolean wasPrepared;
+    public TransactionId transactionId;
+
+    public JournalTransaction(byte type, TransactionId transactionId, boolean wasPrepared) {
+        this.type = type;
+        this.transactionId = transactionId;
+        this.wasPrepared = wasPrepared;
+    }
+
+    public JournalTransaction() {
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public TransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(TransactionId transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public byte getType() {
+        return type;
+    }
+
+    public void setType(byte type) {
+        this.type = type;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean getWasPrepared() {
+        return wasPrepared;
+    }
+
+    public void setWasPrepared(boolean wasPrepared) {
+        this.wasPrepared = wasPrepared;
+    }
+
+    @Override
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + " { " + transactionId + " }";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/KeepAliveInfo.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/KeepAliveInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/KeepAliveInfo.java
new file mode 100644
index 0000000..270475b
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/KeepAliveInfo.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="10"
+ */
+public class KeepAliveInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.KEEP_ALIVE_INFO;
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    @Override
+    public boolean isResponse() {
+        return false;
+    }
+
+    @Override
+    public boolean isMessageDispatch() {
+        return false;
+    }
+
+    @Override
+    public boolean isMessage() {
+        return false;
+    }
+
+    @Override
+    public boolean isMessageAck() {
+        return false;
+    }
+
+    @Override
+    public boolean isBrokerInfo() {
+        return false;
+    }
+
+    @Override
+    public boolean isWireFormatInfo() {
+        return false;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processKeepAlive(this);
+    }
+
+    @Override
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    @Override
+    public boolean isMessageDispatchNotification() {
+        return false;
+    }
+
+    @Override
+    public boolean isShutdownInfo() {
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return this.getClass().getSimpleName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LastPartialCommand.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LastPartialCommand.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LastPartialCommand.java
new file mode 100644
index 0000000..cc49d63
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LastPartialCommand.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * Represents the end marker of a stream of {@link PartialCommand} instances.
+ *
+ * @openwire:marshaller code="61"
+ */
+public class LastPartialCommand extends PartialCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PARTIAL_LAST_COMMAND;
+
+    public LastPartialCommand() {
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        throw new IllegalStateException("The transport layer should filter out LastPartialCommand instances but received: " + this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LocalTransactionId.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LocalTransactionId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LocalTransactionId.java
new file mode 100644
index 0000000..ed5f5d4
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/LocalTransactionId.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="111"
+ *
+ */
+public class LocalTransactionId extends TransactionId implements Comparable<LocalTransactionId> {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_LOCAL_TRANSACTION_ID;
+
+    protected ConnectionId connectionId;
+    protected long value;
+
+    private transient String transactionKey;
+    private transient int hashCode;
+
+    public LocalTransactionId() {
+    }
+
+    public LocalTransactionId(ConnectionId connectionId, long transactionId) {
+        this.connectionId = connectionId;
+        this.value = transactionId;
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    @Override
+    public boolean isXATransaction() {
+        return false;
+    }
+
+    @Override
+    public boolean isLocalTransaction() {
+        return true;
+    }
+
+    @Override
+    public String getTransactionKey() {
+        if (transactionKey == null) {
+            transactionKey = "TX:" + connectionId + ":" + value;
+        }
+        return transactionKey;
+    }
+
+    @Override
+    public String toString() {
+        return getTransactionKey();
+    }
+
+    @Override
+    public int hashCode() {
+        if (hashCode == 0) {
+            hashCode = connectionId.hashCode() ^ (int)value;
+        }
+        return hashCode;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != LocalTransactionId.class) {
+            return false;
+        }
+        LocalTransactionId tx = (LocalTransactionId)o;
+        return value == tx.value && connectionId.equals(tx.connectionId);
+    }
+
+    /**
+     * @param o
+     * @return
+     * @see java.lang.Comparable#compareTo(java.lang.Object)
+     */
+    @Override
+    public int compareTo(LocalTransactionId o) {
+        int result = connectionId.compareTo(o.connectionId);
+        if (result == 0) {
+            result = (int)(value - o.value);
+        }
+        return result;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getValue() {
+        return value;
+    }
+
+    public void setValue(long transactionId) {
+        this.value = transactionId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(ConnectionId connectionId) {
+        this.connectionId = connectionId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MarshallAware.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MarshallAware.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MarshallAware.java
new file mode 100644
index 0000000..26405f4
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MarshallAware.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+import java.io.IOException;
+
+import org.apache.activemq.openwire.codec.OpenWireFormat;
+
+public interface MarshallAware {
+
+    void beforeMarshall(OpenWireFormat wireFormat) throws IOException;
+
+    void afterMarshall(OpenWireFormat wireFormat) throws IOException;
+
+    void beforeUnmarshall(OpenWireFormat wireFormat) throws IOException;
+
+    void afterUnmarshall(OpenWireFormat wireFormat) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Message.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Message.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Message.java
new file mode 100644
index 0000000..b691f93
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Message.java
@@ -0,0 +1,718 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+import static org.apache.activemq.openwire.codec.OpenWireConstants.ADIVSORY_MESSAGE_TYPE;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.openwire.codec.OpenWireFormat;
+import org.apache.activemq.openwire.utils.ExceptionSupport;
+import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+import org.fusesource.hawtbuf.UTF8Buffer;
+
+/**
+ * Represents an ActiveMQ message
+ *
+ * @openwire:marshaller
+ */
+public abstract class Message extends BaseCommand implements MarshallAware {
+
+    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
+
+    /**
+     * The default minimum amount of memory a message is assumed to use
+     */
+    public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
+
+    protected MessageId messageId;
+    protected OpenWireDestination originalDestination;
+    protected TransactionId originalTransactionId;
+
+    protected ProducerId producerId;
+    protected OpenWireDestination destination;
+    protected TransactionId transactionId;
+
+    protected long expiration;
+    protected long timestamp;
+    protected long arrival;
+    protected long brokerInTime;
+    protected long brokerOutTime;
+    protected String correlationId;
+    protected OpenWireDestination replyTo;
+    protected boolean persistent;
+    protected String type;
+    protected byte priority;
+    protected String groupId;
+    protected int groupSequence;
+    protected ConsumerId targetConsumerId;
+    protected boolean compressed;
+    protected String userId;
+
+    protected Buffer content;
+    protected Buffer marshalledProperties;
+    protected DataStructure dataStructure;
+    protected int redeliveryCounter;
+
+    protected int size;
+    protected Map<String, Object> properties;
+    protected transient boolean recievedByDFBridge;
+    protected boolean droppable;
+    protected boolean jmsXGroupFirstForConsumer;
+
+    private BrokerId[] brokerPath;
+    private BrokerId[] cluster;
+
+    public abstract Message copy();
+    public abstract void clearBody() throws JMSException;
+    public abstract void storeContent();
+    public abstract void storeContentAndClear();
+
+    // useful to reduce the memory footprint of a persisted message
+    public void clearMarshalledState() throws JMSException {
+        properties = null;
+    }
+
+    protected void copy(Message copy) {
+        super.copy(copy);
+        copy.producerId = producerId;
+        copy.transactionId = transactionId;
+        copy.destination = destination;
+        copy.messageId = messageId != null ? messageId.copy() : null;
+        copy.originalDestination = originalDestination;
+        copy.originalTransactionId = originalTransactionId;
+        copy.expiration = expiration;
+        copy.timestamp = timestamp;
+        copy.correlationId = correlationId;
+        copy.replyTo = replyTo;
+        copy.persistent = persistent;
+        copy.redeliveryCounter = redeliveryCounter;
+        copy.type = type;
+        copy.priority = priority;
+        copy.size = size;
+        copy.groupId = groupId;
+        copy.userId = userId;
+        copy.groupSequence = groupSequence;
+
+        if (properties != null) {
+            copy.properties = new HashMap<String, Object>(properties);
+
+            // The new message hasn't expired, so remove this feild.
+            copy.properties.remove(ORIGINAL_EXPIRATION);
+        } else {
+            copy.properties = properties;
+        }
+
+        copy.content = content;
+        copy.marshalledProperties = marshalledProperties;
+        copy.dataStructure = dataStructure;
+        copy.compressed = compressed;
+        copy.recievedByDFBridge = recievedByDFBridge;
+
+        copy.arrival = arrival;
+        copy.brokerInTime = brokerInTime;
+        copy.brokerOutTime = brokerOutTime;
+        copy.brokerPath = brokerPath;
+        copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<String, Object> getProperties() throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                return Collections.EMPTY_MAP;
+            }
+            properties = unmarsallProperties(marshalledProperties);
+        }
+        return Collections.unmodifiableMap(properties);
+    }
+
+    public void clearProperties() throws JMSException {
+        marshalledProperties = null;
+        properties = null;
+    }
+
+    public Object getProperty(String name) throws JMSException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                return null;
+            }
+            try {
+                properties = unmarsallProperties(marshalledProperties);
+            } catch (IOException e) {
+                throw ExceptionSupport.create("Error during properties unmarshal, reason: " + e.getMessage(), e);
+            }
+        }
+        Object result = properties.get(name);
+        if (result instanceof UTF8Buffer) {
+            result = result.toString();
+        }
+
+        return result;
+    }
+
+    public void setProperty(String name, Object value) throws JMSException {
+        lazyCreateProperties();
+        properties.put(name, value);
+    }
+
+    public void removeProperty(String name) throws JMSException {
+        lazyCreateProperties();
+        properties.remove(name);
+    }
+
+    protected void lazyCreateProperties() throws JMSException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                properties = new HashMap<String, Object>();
+            } else {
+                try {
+                    properties = unmarsallProperties(marshalledProperties);
+                } catch (IOException e) {
+                    throw ExceptionSupport.create(
+                        "Error during properties unmarshal, reason: " + e.getMessage(), e);
+                }
+                marshalledProperties = null;
+            }
+        } else {
+            marshalledProperties = null;
+        }
+    }
+
+    private Map<String, Object> unmarsallProperties(Buffer marshalledProperties) throws IOException {
+        return OpenWireMarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
+    }
+
+    @Override
+    public void beforeMarshall(OpenWireFormat wireFormat) throws IOException {
+        // Need to marshal the properties.
+        if (marshalledProperties == null && properties != null) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputStream os = new DataOutputStream(baos);
+            OpenWireMarshallingSupport.marshalPrimitiveMap(properties, os);
+            os.close();
+            marshalledProperties = baos.toBuffer();
+        }
+    }
+
+    @Override
+    public void afterMarshall(OpenWireFormat wireFormat) throws IOException {
+    }
+
+    @Override
+    public void beforeUnmarshall(OpenWireFormat wireFormat) throws IOException {
+    }
+
+    @Override
+    public void afterUnmarshall(OpenWireFormat wireFormat) throws IOException {
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public OpenWireDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(OpenWireDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public TransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(TransactionId transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    public boolean isInTransaction() {
+        return transactionId != null;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public OpenWireDestination getOriginalDestination() {
+        return originalDestination;
+    }
+
+    public void setOriginalDestination(OpenWireDestination destination) {
+        this.originalDestination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageId getMessageId() {
+        return messageId;
+    }
+
+    public void setMessageId(MessageId messageId) {
+        this.messageId = messageId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public TransactionId getOriginalTransactionId() {
+        return originalTransactionId;
+    }
+
+    public void setOriginalTransactionId(TransactionId transactionId) {
+        this.originalTransactionId = transactionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupID(String groupId) {
+        this.groupId = groupId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getGroupSequence() {
+        return groupSequence;
+    }
+
+    public void setGroupSequence(int groupSequence) {
+        this.groupSequence = groupSequence;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getCorrelationId() {
+        return correlationId;
+    }
+
+    public void setCorrelationId(String correlationId) {
+        this.correlationId = correlationId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isPersistent() {
+        return persistent;
+    }
+
+    public void setPersistent(boolean deliveryMode) {
+        this.persistent = deliveryMode;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getExpiration() {
+        return expiration;
+    }
+
+    public void setExpiration(long expiration) {
+        this.expiration = expiration;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public byte getPriority() {
+        return priority;
+    }
+
+    public void setPriority(byte priority) {
+        if (priority < 0) {
+            this.priority = 0;
+        } else if (priority > 9) {
+            this.priority = 9;
+        } else {
+            this.priority = priority;
+        }
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public OpenWireDestination getReplyTo() {
+        return replyTo;
+    }
+
+    public void setReplyTo(OpenWireDestination replyTo) {
+        this.replyTo = replyTo;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public Buffer getContent() {
+        return content;
+    }
+
+    public void setContent(Buffer content) {
+        this.content = content;
+        if (content == null) {
+            compressed = false;
+        }
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public Buffer getMarshalledProperties() {
+        return marshalledProperties;
+    }
+
+    public void setMarshalledProperties(Buffer marshalledProperties) {
+        this.marshalledProperties = marshalledProperties;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public DataStructure getDataStructure() {
+        return dataStructure;
+    }
+
+    public void setDataStructure(DataStructure data) {
+        this.dataStructure = data;
+    }
+
+    /**
+     * Can be used to route the message to a specific consumer. Should be null
+     * to allow the broker use normal JMS routing semantics. If the target
+     * consumer id is an active consumer on the broker, the message is dropped.
+     * Used by the AdvisoryBroker to replay advisory messages to a specific
+     * consumer.
+     *
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getTargetConsumerId() {
+        return targetConsumerId;
+    }
+
+    public void setTargetConsumerId(ConsumerId targetConsumerId) {
+        this.targetConsumerId = targetConsumerId;
+    }
+
+    public boolean isExpired() {
+        long expireTime = getExpiration();
+        return expireTime > 0 && System.currentTimeMillis() > expireTime;
+    }
+
+    public boolean isAdvisory() {
+        return type != null && type.equals(ADIVSORY_MESSAGE_TYPE);
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isCompressed() {
+        return compressed;
+    }
+
+    public void setCompressed(boolean compressed) {
+        this.compressed = compressed;
+    }
+
+    public boolean isRedelivered() {
+        return redeliveryCounter > 0;
+    }
+
+    public void setRedelivered(boolean redelivered) {
+        if (redelivered) {
+            if (!isRedelivered()) {
+                setRedeliveryCounter(1);
+            }
+        } else {
+            if (isRedelivered()) {
+                setRedeliveryCounter(0);
+            }
+        }
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getRedeliveryCounter() {
+        return redeliveryCounter;
+    }
+
+    public void setRedeliveryCounter(int deliveryCounter) {
+        this.redeliveryCounter = deliveryCounter;
+    }
+
+    /**
+     * The route of brokers the command has moved through.
+     *
+     * @openwire:property version=1 cache=true
+     */
+    public BrokerId[] getBrokerPath() {
+        return brokerPath;
+    }
+
+    public void setBrokerPath(BrokerId[] brokerPath) {
+        this.brokerPath = brokerPath;
+    }
+
+    /**
+     * Used to schedule the arrival time of a message to a broker. The broker
+     * will not dispatch a message to a consumer until it's arrival time has
+     * elapsed.
+     *
+     * @openwire:property version=1
+     */
+    public long getArrival() {
+        return arrival;
+    }
+
+    public void setArrival(long arrival) {
+        this.arrival = arrival;
+    }
+
+    /**
+     * Only set by the broker and defines the userID of the producer connection
+     * who sent this message. This is an optional field, it needs to be enabled
+     * on the broker to have this field populated.
+     *
+     * @openwire:property version=1
+     */
+    public String getUserId() {
+        return userId;
+    }
+
+    public void setUserId(String jmsxUserId) {
+        this.userId = jmsxUserId;
+    }
+
+    @Override
+    public boolean isMarshallAware() {
+        return true;
+    }
+
+    public int getSize() {
+        int minimumMessageSize = DEFAULT_MINIMUM_MESSAGE_SIZE;
+        if (size < minimumMessageSize || size == 0) {
+            size = minimumMessageSize;
+            if (marshalledProperties != null) {
+                size += marshalledProperties.getLength();
+            }
+            if (content != null) {
+                size += content.getLength();
+            }
+        }
+        return size;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the recievedByDFBridge.
+     */
+    public boolean isRecievedByDFBridge() {
+        return recievedByDFBridge;
+    }
+
+    /**
+     * @param recievedByDFBridge The recievedByDFBridge to set.
+     */
+    public void setRecievedByDFBridge(boolean recievedByDFBridge) {
+        this.recievedByDFBridge = recievedByDFBridge;
+    }
+
+    /**
+     * @openwire:property version=2 cache=true
+     */
+    public boolean isDroppable() {
+        return droppable;
+    }
+
+    public void setDroppable(boolean droppable) {
+        this.droppable = droppable;
+    }
+
+    /**
+     * If a message is stored in multiple nodes on a cluster, all the cluster
+     * members will be listed here. Otherwise, it will be null.
+     *
+     * @openwire:property version=3 cache=true
+     */
+    public BrokerId[] getCluster() {
+        return cluster;
+    }
+
+    public void setCluster(BrokerId[] cluster) {
+        this.cluster = cluster;
+    }
+
+    @Override
+    public boolean isMessage() {
+        return true;
+    }
+
+    /**
+     * @openwire:property version=3
+     */
+    public long getBrokerInTime() {
+        return this.brokerInTime;
+    }
+
+    public void setBrokerInTime(long brokerInTime) {
+        this.brokerInTime = brokerInTime;
+    }
+
+    /**
+     * @openwire:property version=3
+     */
+    public long getBrokerOutTime() {
+        return this.brokerOutTime;
+    }
+
+    public void setBrokerOutTime(long brokerOutTime) {
+        this.brokerOutTime = brokerOutTime;
+    }
+
+    /**
+     * @openwire:property version=10
+     */
+    public boolean isJMSXGroupFirstForConsumer() {
+        return jmsXGroupFirstForConsumer;
+    }
+
+    public void setJMSXGroupFirstForConsumer(boolean val) {
+        jmsXGroupFirstForConsumer = val;
+    }
+
+    /**
+     * For a Message that is not currently using compression in its message body this
+     * method will initiate a store of current content and then compress the data in
+     * the message body.
+     *
+     * @throws IOException if an error occurs during the compression process.
+     */
+    public void compress() throws IOException {
+        if (!isCompressed()) {
+            storeContent();
+            if (!isCompressed() && getContent() != null) {
+                doCompress();
+            }
+        }
+    }
+
+    /**
+     * For a message whose body is compressed this method will perform a full decompression
+     * of the contents and return the resulting uncompressed buffer, if the contents are not
+     * compressed then they are returned unchanged.
+     *
+     * @return a Buffer instance that contains the message contents, uncompressed if needed.
+     *
+     * @throws IOException if an error occurs during decompression of the message contents.
+     */
+    public Buffer decompress() throws IOException {
+        if (isCompressed()) {
+            return doDecompress();
+        } else {
+            return content;
+        }
+    }
+
+    protected Buffer doDecompress() throws IOException {
+        ByteArrayInputStream input = new ByteArrayInputStream(this.content.getData(), this.content.getOffset(), this.content.getLength());
+        InflaterInputStream inflater = new InflaterInputStream(input);
+
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        try {
+            byte[] buffer = new byte[8*1024];
+            int read = 0;
+            while ((read = inflater.read(buffer)) != -1) {
+                output.write(buffer, 0, read);
+            }
+        } finally {
+            inflater.close();
+            output.close();
+        }
+
+        return output.toBuffer();
+    }
+
+    protected void doCompress() throws IOException {
+        compressed = true;
+        Buffer bytes = getContent();
+        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+        OutputStream os = new DeflaterOutputStream(bytesOut);
+        os.write(bytes.data, bytes.offset, bytes.length);
+        os.close();
+        setContent(bytesOut.toBuffer());
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + " { " + messageId + " }";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageAck.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageAck.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageAck.java
new file mode 100644
index 0000000..783b048
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageAck.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="22"
+ */
+public class MessageAck extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK;
+
+    /**
+     * Used to let the broker know that the message has been delivered to the client. Message
+     * will still be retained until an standard ack is received. This is used get the broker to
+     * send more messages past prefetch limits when an standard ack has not been sent.
+     */
+    public static final byte DELIVERED_ACK_TYPE = 0;
+
+    /**
+     * The standard ack case where a client wants the message to be discarded.
+     */
+    public static final byte STANDARD_ACK_TYPE = 2;
+
+    /**
+     * In case the client want's to explicitly let the broker know that a message was not
+     * processed and the message was considered a poison message.
+     */
+    public static final byte POSION_ACK_TYPE = 1;
+
+    /**
+     * In case the client want's to explicitly let the broker know that a message was not
+     * processed and it was re-delivered to the consumer but it was not yet considered to be a
+     * poison message. The messageCount field will hold the number of times the message was
+     * re-delivered.
+     */
+    public static final byte REDELIVERED_ACK_TYPE = 3;
+
+    /**
+     * The ack case where a client wants only an individual message to be discarded.
+     */
+    public static final byte INDIVIDUAL_ACK_TYPE = 4;
+
+    /**
+     * The ack case where a durable topic subscription does not match a selector.
+     */
+    public static final byte UNMATCHED_ACK_TYPE = 5;
+
+    /**
+     * the case where a consumer does not dispatch because message has expired inflight
+     */
+    public static final byte EXPIRED_ACK_TYPE = 6;
+
+    protected byte ackType;
+    protected ConsumerId consumerId;
+    protected MessageId firstMessageId;
+    protected MessageId lastMessageId;
+    protected OpenWireDestination destination;
+    protected TransactionId transactionId;
+    protected int messageCount;
+    protected Throwable poisonCause;
+
+    protected transient String consumerKey;
+
+    public MessageAck() {
+    }
+
+    public MessageAck(MessageDispatch md, byte ackType, int messageCount) {
+        this.ackType = ackType;
+        this.consumerId = md.getConsumerId();
+        this.destination = md.getDestination();
+        this.lastMessageId = md.getMessage().getMessageId();
+        this.messageCount = messageCount;
+    }
+
+    public MessageAck(Message message, byte ackType, int messageCount) {
+        this.ackType = ackType;
+        this.destination = message.getDestination();
+        this.lastMessageId = message.getMessageId();
+        this.messageCount = messageCount;
+    }
+
+    public void copy(MessageAck copy) {
+        super.copy(copy);
+        copy.firstMessageId = firstMessageId;
+        copy.lastMessageId = lastMessageId;
+        copy.destination = destination;
+        copy.transactionId = transactionId;
+        copy.ackType = ackType;
+        copy.consumerId = consumerId;
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    @Override
+    public boolean isMessageAck() {
+        return true;
+    }
+
+    public boolean isPoisonAck() {
+        return ackType == POSION_ACK_TYPE;
+    }
+
+    public boolean isStandardAck() {
+        return ackType == STANDARD_ACK_TYPE;
+    }
+
+    public boolean isDeliveredAck() {
+        return ackType == DELIVERED_ACK_TYPE;
+    }
+
+    public boolean isRedeliveredAck() {
+        return ackType == REDELIVERED_ACK_TYPE;
+    }
+
+    public boolean isIndividualAck() {
+        return ackType == INDIVIDUAL_ACK_TYPE;
+    }
+
+    public boolean isUnmatchedAck() {
+        return ackType == UNMATCHED_ACK_TYPE;
+    }
+
+    public boolean isExpiredAck() {
+        return ackType == EXPIRED_ACK_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public OpenWireDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(OpenWireDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public TransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(TransactionId transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    public boolean isInTransaction() {
+        return transactionId != null;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(ConsumerId consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public byte getAckType() {
+        return ackType;
+    }
+
+    public void setAckType(byte ackType) {
+        this.ackType = ackType;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageId getFirstMessageId() {
+        return firstMessageId;
+    }
+
+    public void setFirstMessageId(MessageId firstMessageId) {
+        this.firstMessageId = firstMessageId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageId getLastMessageId() {
+        return lastMessageId;
+    }
+
+    public void setLastMessageId(MessageId lastMessageId) {
+        this.lastMessageId = lastMessageId;
+    }
+
+    /**
+     * The number of messages being acknowledged in the range.
+     *
+     * @openwire:property version=1
+     */
+    public int getMessageCount() {
+        return messageCount;
+    }
+
+    public void setMessageCount(int messageCount) {
+        this.messageCount = messageCount;
+    }
+
+    /**
+     * The cause of a poison ack, if a message listener throws an exception it will be recorded
+     * here
+     *
+     * @openwire:property version=7
+     */
+    public Throwable getPoisonCause() {
+        return poisonCause;
+    }
+
+    public void setPoisonCause(Throwable poisonCause) {
+        this.poisonCause = poisonCause;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processMessageAck(this);
+    }
+
+    /**
+     * A helper method to allow a single message ID to be acknowledged
+     */
+    public void setMessageID(MessageId messageID) {
+        setFirstMessageId(messageID);
+        setLastMessageId(messageID);
+        setMessageCount(1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatch.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatch.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatch.java
new file mode 100644
index 0000000..6dfe04d
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatch.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="21"
+ */
+public class MessageDispatch extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH;
+
+    protected ConsumerId consumerId;
+    protected OpenWireDestination destination;
+    protected Message message;
+    protected int redeliveryCounter;
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    @Override
+    public boolean isMessageDispatch() {
+        return true;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(ConsumerId consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public OpenWireDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(OpenWireDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public Message getMessage() {
+        return message;
+    }
+
+    public void setMessage(Message message) {
+        this.message = message;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getRedeliveryCounter() {
+        return redeliveryCounter;
+    }
+
+    public void setRedeliveryCounter(int deliveryCounter) {
+        this.redeliveryCounter = deliveryCounter;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processMessageDispatch(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatchNotification.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatchNotification.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatchNotification.java
new file mode 100644
index 0000000..91ae1aa
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageDispatchNotification.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * @openwire:marshaller code="90"
+ */
+public class MessageDispatchNotification extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH_NOTIFICATION;
+
+    protected ConsumerId consumerId;
+    protected OpenWireDestination destination;
+    protected MessageId messageId;
+    protected long deliverySequenceId;
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    @Override
+    public boolean isMessageDispatchNotification() {
+        return true;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(ConsumerId consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public OpenWireDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(OpenWireDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getDeliverySequenceId() {
+        return deliverySequenceId;
+    }
+
+    public void setDeliverySequenceId(long deliverySequenceId) {
+        this.deliverySequenceId = deliverySequenceId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageId getMessageId() {
+        return messageId;
+    }
+
+    public void setMessageId(MessageId messageId) {
+        this.messageId = messageId;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processMessageDispatchNotification(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageId.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageId.java
new file mode 100644
index 0000000..4f3b13e
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessageId.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+
+/**
+ * @openwire:marshaller code="110"
+ */
+public class MessageId implements DataStructure, Comparable<MessageId> {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ID;
+
+    protected String textView;
+    protected ProducerId producerId;
+    protected long producerSequenceId;
+    protected long brokerSequenceId;
+
+    private transient String key;
+    private transient int hashCode;
+
+    public MessageId() {
+        this.producerId = new ProducerId();
+    }
+
+    public MessageId(ProducerInfo producerInfo, long producerSequenceId) {
+        this.producerId = producerInfo.getProducerId();
+        this.producerSequenceId = producerSequenceId;
+    }
+
+    public MessageId(String messageKey) {
+        setValue(messageKey);
+    }
+
+    public MessageId(String producerId, long producerSequenceId) {
+        this(new ProducerId(producerId), producerSequenceId);
+    }
+
+    public MessageId(ProducerId producerId, long producerSequenceId) {
+        this.producerId = producerId;
+        this.producerSequenceId = producerSequenceId;
+    }
+
+    /**
+     * Sets the value as a String
+     */
+    public void setValue(String messageKey) {
+        key = messageKey;
+        // Parse off the sequenceId
+        int p = messageKey.lastIndexOf(":");
+        if (p >= 0) {
+            producerSequenceId = Long.parseLong(messageKey.substring(p + 1));
+            messageKey = messageKey.substring(0, p);
+        } else {
+            throw new NumberFormatException();
+        }
+        producerId = new ProducerId(messageKey);
+    }
+
+    /**
+     * Sets the transient text view of the message which will be ignored if the message is marshaled on a transport; so
+     * is only for in-JVM changes to accommodate foreign JMS message IDs
+     */
+    public void setTextView(String key) {
+        this.textView = key;
+    }
+
+    /**
+     * @openwire:property version=10
+     * @return
+     */
+    public String getTextView() {
+        return textView;
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != getClass()) {
+            return false;
+        }
+
+        MessageId id = (MessageId) o;
+        return producerSequenceId == id.producerSequenceId && producerId.equals(id.producerId);
+    }
+
+    @Override
+    public int hashCode() {
+        if (hashCode == 0) {
+            if (textView != null) {
+                hashCode = textView.hashCode();
+            } else {
+                hashCode = producerId.hashCode() ^ (int) producerSequenceId;
+            }
+        }
+        return hashCode;
+    }
+
+    public String toProducerKey() {
+        if (textView == null) {
+            return toString();
+        } else {
+            return producerId.toString() + ":" + producerSequenceId;
+        }
+    }
+
+    @Override
+    public String toString() {
+        if (key == null) {
+            if (textView != null) {
+                if (textView.startsWith("ID:")) {
+                    key = textView;
+                } else {
+                    key = "ID:" + textView;
+                }
+            } else {
+                key = producerId.toString() + ":" + producerSequenceId;
+            }
+        }
+        return key;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getProducerSequenceId() {
+        return producerSequenceId;
+    }
+
+    public void setProducerSequenceId(long producerSequenceId) {
+        this.producerSequenceId = producerSequenceId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getBrokerSequenceId() {
+        return brokerSequenceId;
+    }
+
+    public void setBrokerSequenceId(long brokerSequenceId) {
+        this.brokerSequenceId = brokerSequenceId;
+    }
+
+    @Override
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public MessageId copy() {
+        MessageId copy = new MessageId(producerId, producerSequenceId);
+        copy.key = key;
+        copy.brokerSequenceId = brokerSequenceId;
+        copy.textView = textView;
+        return copy;
+    }
+
+    /**
+     * @param
+     * @return
+     * @see java.lang.Comparable#compareTo(java.lang.Object)
+     */
+    @Override
+    public int compareTo(MessageId other) {
+        int result = -1;
+        if (other != null) {
+            result = this.toString().compareTo(other.toString());
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessagePull.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessagePull.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessagePull.java
new file mode 100644
index 0000000..29f5c01
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/MessagePull.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+/**
+ * Used to pull messages on demand, the command can have a time value that indicates
+ * how long the Broker keeps the pull request open before returning a MessageDispatch
+ * with a null payload.
+ *
+ * @openwire:marshaller code="20"
+ */
+public class MessagePull extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_PULL;
+
+    protected ConsumerId consumerId;
+    protected OpenWireDestination destination;
+    protected long timeout;
+    private MessageId messageId;
+    private String correlationId;
+
+    private transient boolean tracked = false;
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    @Override
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processMessagePull(this);
+    }
+
+    /**
+     * Configures a message pull from the consumer information
+     */
+    public void configure(ConsumerInfo info) {
+        setConsumerId(info.getConsumerId());
+        setDestination(info.getDestination());
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(ConsumerId consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public OpenWireDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(OpenWireDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    /**
+     * An optional correlation ID which could be used by a broker to decide which messages are pulled
+     * on demand from a queue for a consumer
+     *
+     * @openwire:property version=3
+     */
+    public String getCorrelationId() {
+        return correlationId;
+    }
+
+    public void setCorrelationId(String correlationId) {
+        this.correlationId = correlationId;
+    }
+
+    /**
+     * An optional message ID which could be used by a broker to decide which messages are pulled
+     * on demand from a queue for a consumer
+     *
+     * @openwire:property version=3
+     */
+    public MessageId getMessageId() {
+        return messageId;
+    }
+
+    public void setMessageId(MessageId messageId) {
+        this.messageId = messageId;
+    }
+
+    public void setTracked(boolean tracked) {
+        this.tracked = tracked;
+    }
+
+    public boolean isTracked() {
+        return this.tracked;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/NetworkBridgeFilter.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/NetworkBridgeFilter.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/NetworkBridgeFilter.java
new file mode 100644
index 0000000..f39286a
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/NetworkBridgeFilter.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @openwire:marshaller code="91"
+ */
+public class NetworkBridgeFilter implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
+
+    static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class);
+
+    protected BrokerId networkBrokerId;
+    protected int messageTTL;
+    protected int consumerTTL;
+
+    public NetworkBridgeFilter() {
+    }
+
+    public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int messageTTL, int consumerTTL) {
+        this.networkBrokerId = networkBrokerId;
+        this.messageTTL = messageTTL;
+        this.consumerTTL = consumerTTL;
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    @Override
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    // keep for backward compat with older wire formats
+    public int getNetworkTTL() {
+        return messageTTL;
+    }
+
+    public void setNetworkTTL(int networkTTL) {
+        messageTTL = networkTTL;
+        consumerTTL = networkTTL;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public BrokerId getNetworkBrokerId() {
+        return networkBrokerId;
+    }
+
+    public void setNetworkBrokerId(BrokerId remoteBrokerPath) {
+        this.networkBrokerId = remoteBrokerPath;
+    }
+
+    public void setMessageTTL(int messageTTL) {
+        this.messageTTL = messageTTL;
+    }
+
+    /**
+     * @openwire:property version=10
+     */
+    public int getMessageTTL() {
+        return this.messageTTL;
+    }
+
+    public void setConsumerTTL(int consumerTTL) {
+        this.consumerTTL = consumerTTL;
+    }
+
+    /**
+     * @openwire:property version=10
+     */
+    public int getConsumerTTL() {
+        return this.consumerTTL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBlobMessage.java
----------------------------------------------------------------------
diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBlobMessage.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBlobMessage.java
new file mode 100644
index 0000000..1c3f191
--- /dev/null
+++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/OpenWireBlobMessage.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire.commands;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+
+/**
+ * An implementation of ActiveMQ's BlobMessage for out of band BLOB transfer
+ *
+ * openwire:marshaller code="29"
+ */
+public class OpenWireBlobMessage extends OpenWireMessage {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_BLOB_MESSAGE;
+
+    public static final String BINARY_MIME_TYPE = "application/octet-stream";
+
+    private String remoteBlobUrl;
+    private String mimeType;
+    private String name;
+    private boolean deletedByBroker;
+
+    private transient URL url;
+
+    @Override
+    public OpenWireBlobMessage copy() {
+        OpenWireBlobMessage copy = new OpenWireBlobMessage();
+        copy(copy);
+        return copy;
+    }
+
+    private void copy(OpenWireBlobMessage copy) {
+        super.copy(copy);
+        copy.setRemoteBlobUrl(getRemoteBlobUrl());
+        copy.setMimeType(getMimeType());
+        copy.setDeletedByBroker(isDeletedByBroker());
+        copy.setName(getName());
+    }
+
+    @Override
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=3 cache=false
+     */
+    public String getRemoteBlobUrl() {
+        return remoteBlobUrl;
+    }
+
+    public void setRemoteBlobUrl(String remoteBlobUrl) {
+        this.remoteBlobUrl = remoteBlobUrl;
+        url = null;
+    }
+
+    /**
+     * The MIME type of the BLOB which can be used to apply different content types to messages.
+     *
+     * @openwire:property version=3 cache=true
+     */
+    @Override
+    public String getMimeType() {
+        if (mimeType == null) {
+            return BINARY_MIME_TYPE;
+        }
+        return mimeType;
+    }
+
+    public void setMimeType(String mimeType) {
+        this.mimeType = mimeType;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * The name of the attachment which can be useful information if transmitting files over
+     * ActiveMQ
+     *
+     * @openwire:property version=3 cache=false
+     */
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * @openwire:property version=3 cache=false
+     */
+    public boolean isDeletedByBroker() {
+        return deletedByBroker;
+    }
+
+    public void setDeletedByBroker(boolean deletedByBroker) {
+        this.deletedByBroker = deletedByBroker;
+    }
+
+    public InputStream getInputStream() throws IOException, JMSException {
+        return null;
+    }
+
+    public URL getURL() throws JMSException {
+        if (url == null && remoteBlobUrl != null) {
+            try {
+                url = new URL(remoteBlobUrl);
+            } catch (MalformedURLException e) {
+                throw new JMSException(e.getMessage());
+            }
+        }
+        return url;
+    }
+
+    public void setURL(URL url) {
+        this.url = url;
+        remoteBlobUrl = url != null ? url.toExternalForm() : null;
+    }
+
+    @Override
+    public void onSend() throws JMSException {
+        super.onSend();
+
+        // lets ensure we upload the BLOB first out of band before we send the
+        // message
+        // TODO - Lets support this later.
+    }
+
+    public void deleteFile() throws IOException, JMSException {
+    }
+}


Mime
View raw message