activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [33/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:42:03 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketDecoder.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketDecoder.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketDecoder.java
new file mode 100644
index 0000000..c13a4e8
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketDecoder.java
@@ -0,0 +1,474 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl;
+
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY_V2;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY_V3;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CREATESESSION;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CREATESESSION_RESP;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.DISCONNECT;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.DISCONNECT_V2;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.EXCEPTION;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.NULL_RESPONSE;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.PACKETS_CONFIRMED;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.PING;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.REATTACH_SESSION;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.REATTACH_SESSION_RESP;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA2;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_COMMIT;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_CREATECONSUMER;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_EXPIRED;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_CREDITS;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_FAIL_CREDITS;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY_RESP;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_START;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_STOP;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_UNIQUE_ADD_METADATA;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_COMMIT;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_END;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_FAILED;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_FORGET;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_GET_TIMEOUT;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_GET_TIMEOUT_RESP;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_INDOUBT_XIDS;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_INDOUBT_XIDS_RESP;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_JOIN;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_PREPARE;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_RESP;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_RESUME;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_ROLLBACK;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_START;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY;
+import static org.apache.activemq6.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY_V2;
+
+import java.io.Serializable;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.client.HornetQClientMessageBundle;
+import org.apache.activemq6.core.protocol.core.Packet;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.CheckFailoverMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.CheckFailoverReplyMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V3;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateQueueMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateSessionMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.NullResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.Ping;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.ReattachSessionMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.RollbackMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionCloseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionCommitMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionExpireMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXACommitMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAEndMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SessionXAStartMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
+
+/**
+ * A PacketDecoder
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public abstract class PacketDecoder implements Serializable
+{
+   public abstract Packet decode(final HornetQBuffer in);
+
+   public Packet decode(byte packetType)
+   {
+      Packet packet;
+
+      switch (packetType)
+      {
+         case PING:
+         {
+            packet = new Ping();
+            break;
+         }
+         case DISCONNECT:
+         {
+            packet = new DisconnectMessage();
+            break;
+         }
+         case DISCONNECT_V2:
+         {
+            packet = new DisconnectMessage_V2();
+            break;
+         }
+         case DISCONNECT_CONSUMER:
+         {
+            packet = new DisconnectConsumerMessage();
+            break;
+         }
+         case EXCEPTION:
+         {
+            packet = new HornetQExceptionMessage();
+            break;
+         }
+         case PACKETS_CONFIRMED:
+         {
+            packet = new PacketsConfirmedMessage();
+            break;
+         }
+         case CREATESESSION:
+         {
+            packet = new CreateSessionMessage();
+            break;
+         }
+         case CHECK_FOR_FAILOVER:
+         {
+            packet = new CheckFailoverMessage();
+            break;
+         }
+         case CREATESESSION_RESP:
+         {
+            packet = new CreateSessionResponseMessage();
+            break;
+         }
+         case REATTACH_SESSION:
+         {
+            packet = new ReattachSessionMessage();
+            break;
+         }
+         case REATTACH_SESSION_RESP:
+         {
+            packet = new ReattachSessionResponseMessage();
+            break;
+         }
+         case SESS_CLOSE:
+         {
+            packet = new SessionCloseMessage();
+            break;
+         }
+         case SESS_CREATECONSUMER:
+         {
+            packet = new SessionCreateConsumerMessage();
+            break;
+         }
+         case SESS_ACKNOWLEDGE:
+         {
+            packet = new SessionAcknowledgeMessage();
+            break;
+         }
+         case SESS_EXPIRED:
+         {
+            packet = new SessionExpireMessage();
+            break;
+         }
+         case SESS_COMMIT:
+         {
+            packet = new SessionCommitMessage();
+            break;
+         }
+         case SESS_ROLLBACK:
+         {
+            packet = new RollbackMessage();
+            break;
+         }
+         case SESS_QUEUEQUERY:
+         {
+            packet = new SessionQueueQueryMessage();
+            break;
+         }
+         case SESS_QUEUEQUERY_RESP:
+         {
+            packet = new SessionQueueQueryResponseMessage();
+            break;
+         }
+         case CREATE_QUEUE:
+         {
+            packet = new CreateQueueMessage();
+            break;
+         }
+         case CREATE_SHARED_QUEUE:
+         {
+            packet = new CreateSharedQueueMessage();
+            break;
+         }
+         case DELETE_QUEUE:
+         {
+            packet = new SessionDeleteQueueMessage();
+            break;
+         }
+         case SESS_BINDINGQUERY:
+         {
+            packet = new SessionBindingQueryMessage();
+            break;
+         }
+         case SESS_BINDINGQUERY_RESP:
+         {
+            packet = new SessionBindingQueryResponseMessage();
+            break;
+         }
+         case SESS_XA_START:
+         {
+            packet = new SessionXAStartMessage();
+            break;
+         }
+         case SESS_XA_FAILED:
+         {
+            packet = new SessionXAAfterFailedMessage();
+            break;
+         }
+         case SESS_XA_END:
+         {
+            packet = new SessionXAEndMessage();
+            break;
+         }
+         case SESS_XA_COMMIT:
+         {
+            packet = new SessionXACommitMessage();
+            break;
+         }
+         case SESS_XA_PREPARE:
+         {
+            packet = new SessionXAPrepareMessage();
+            break;
+         }
+         case SESS_XA_RESP:
+         {
+            packet = new SessionXAResponseMessage();
+            break;
+         }
+         case SESS_XA_ROLLBACK:
+         {
+            packet = new SessionXARollbackMessage();
+            break;
+         }
+         case SESS_XA_JOIN:
+         {
+            packet = new SessionXAJoinMessage();
+            break;
+         }
+         case SESS_XA_SUSPEND:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
+            break;
+         }
+         case SESS_XA_RESUME:
+         {
+            packet = new SessionXAResumeMessage();
+            break;
+         }
+         case SESS_XA_FORGET:
+         {
+            packet = new SessionXAForgetMessage();
+            break;
+         }
+         case SESS_XA_INDOUBT_XIDS:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS);
+            break;
+         }
+         case SESS_XA_INDOUBT_XIDS_RESP:
+         {
+            packet = new SessionXAGetInDoubtXidsResponseMessage();
+            break;
+         }
+         case SESS_XA_SET_TIMEOUT:
+         {
+            packet = new SessionXASetTimeoutMessage();
+            break;
+         }
+         case SESS_XA_SET_TIMEOUT_RESP:
+         {
+            packet = new SessionXASetTimeoutResponseMessage();
+            break;
+         }
+         case SESS_XA_GET_TIMEOUT:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT);
+            break;
+         }
+         case SESS_XA_GET_TIMEOUT_RESP:
+         {
+            packet = new SessionXAGetTimeoutResponseMessage();
+            break;
+         }
+         case SESS_START:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_START);
+            break;
+         }
+         case SESS_STOP:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_STOP);
+            break;
+         }
+         case SESS_FLOWTOKEN:
+         {
+            packet = new SessionConsumerFlowCreditMessage();
+            break;
+         }
+         case SESS_CONSUMER_CLOSE:
+         {
+            packet = new SessionConsumerCloseMessage();
+            break;
+         }
+         case SESS_INDIVIDUAL_ACKNOWLEDGE:
+         {
+            packet = new SessionIndividualAcknowledgeMessage();
+            break;
+         }
+         case NULL_RESPONSE:
+         {
+            packet = new NullResponseMessage();
+            break;
+         }
+         case SESS_RECEIVE_CONTINUATION:
+         {
+            packet = new SessionReceiveContinuationMessage();
+            break;
+         }
+         case SESS_SEND_CONTINUATION:
+         {
+            packet = new SessionSendContinuationMessage();
+            break;
+         }
+         case SESS_PRODUCER_REQUEST_CREDITS:
+         {
+            packet = new SessionRequestProducerCreditsMessage();
+            break;
+         }
+         case SESS_PRODUCER_CREDITS:
+         {
+            packet = new SessionProducerCreditsMessage();
+            break;
+         }
+         case SESS_PRODUCER_FAIL_CREDITS:
+         {
+            packet = new SessionProducerCreditsFailMessage();
+            break;
+         }
+         case SESS_FORCE_CONSUMER_DELIVERY:
+         {
+            packet = new SessionForceConsumerDelivery();
+            break;
+         }
+         case CLUSTER_TOPOLOGY:
+         {
+            packet = new ClusterTopologyChangeMessage();
+            break;
+         }
+         case CLUSTER_TOPOLOGY_V2:
+         {
+            packet = new ClusterTopologyChangeMessage_V2();
+            break;
+         }
+         case CLUSTER_TOPOLOGY_V3:
+         {
+            packet = new ClusterTopologyChangeMessage_V3();
+            break;
+         }
+         case SUBSCRIBE_TOPOLOGY:
+         {
+            packet = new SubscribeClusterTopologyUpdatesMessage();
+            break;
+         }
+         case SUBSCRIBE_TOPOLOGY_V2:
+         {
+            packet = new SubscribeClusterTopologyUpdatesMessageV2();
+            break;
+         }
+         case SESS_ADD_METADATA:
+         {
+            packet = new SessionAddMetaDataMessage();
+            break;
+         }
+         case SESS_ADD_METADATA2:
+         {
+            packet = new SessionAddMetaDataMessageV2();
+            break;
+         }
+         case SESS_UNIQUE_ADD_METADATA:
+         {
+            packet = new SessionUniqueAddMetaDataMessage();
+            break;
+         }
+         case PacketImpl.CHECK_FOR_FAILOVER_REPLY:
+         {
+            packet = new CheckFailoverReplyMessage();
+            break;
+         }
+         default:
+         {
+            throw HornetQClientMessageBundle.BUNDLE.invalidType(packetType);
+         }
+      }
+
+      return packet;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketImpl.java
new file mode 100644
index 0000000..66f72fe
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/PacketImpl.java
@@ -0,0 +1,379 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.protocol.core.Packet;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+import org.apache.activemq6.utils.DataConstants;
+
+/**
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public class PacketImpl implements Packet
+{
+   // Constants -------------------------------------------------------------------------
+
+   // The minimal size for all the packets, Common data for all the packets (look at
+   // PacketImpl.encode)
+   public static final int PACKET_HEADERS_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE +
+                                                 DataConstants.SIZE_LONG;
+
+   private static final int INITIAL_PACKET_SIZE = 1500;
+
+   protected long channelID;
+
+   private final byte type;
+
+   protected int size = -1;
+
+   // The packet types
+   // -----------------------------------------------------------------------------------
+
+   public static final byte PING = 10;
+
+   public static final byte DISCONNECT = 11;
+
+   public static final byte DISCONNECT_CONSUMER = 12;
+
+   // Miscellaneous
+   public static final byte EXCEPTION = 20;
+
+   public static final byte NULL_RESPONSE = 21;
+
+   public static final byte PACKETS_CONFIRMED = 22;
+
+   // Server
+   public static final byte CREATESESSION = 30;
+
+   public static final byte CREATESESSION_RESP = 31;
+
+   public static final byte REATTACH_SESSION = 32;
+
+   public static final byte REATTACH_SESSION_RESP = 33;
+
+   public static final byte CREATE_QUEUE = 34;
+
+   public static final byte DELETE_QUEUE = 35;
+
+   public static final byte CREATE_SHARED_QUEUE = 36;
+
+   // Session
+
+   public static final byte SESS_XA_FAILED = 39;
+
+   public static final byte SESS_CREATECONSUMER = 40;
+
+   public static final byte SESS_ACKNOWLEDGE = 41;
+
+   public static final byte SESS_EXPIRED = 42;
+
+   public static final byte SESS_COMMIT = 43;
+
+   public static final byte SESS_ROLLBACK = 44;
+
+   public static final byte SESS_QUEUEQUERY = 45;
+
+   public static final byte SESS_QUEUEQUERY_RESP = 46;
+
+   public static final byte SESS_BINDINGQUERY = 49;
+
+   public static final byte SESS_BINDINGQUERY_RESP = 50;
+
+   public static final byte SESS_XA_START = 51;
+
+   public static final byte SESS_XA_END = 52;
+
+   public static final byte SESS_XA_COMMIT = 53;
+
+   public static final byte SESS_XA_PREPARE = 54;
+
+   public static final byte SESS_XA_RESP = 55;
+
+   public static final byte SESS_XA_ROLLBACK = 56;
+
+   public static final byte SESS_XA_JOIN = 57;
+
+   public static final byte SESS_XA_SUSPEND = 58;
+
+   public static final byte SESS_XA_RESUME = 59;
+
+   public static final byte SESS_XA_FORGET = 60;
+
+   public static final byte SESS_XA_INDOUBT_XIDS = 61;
+
+   public static final byte SESS_XA_INDOUBT_XIDS_RESP = 62;
+
+   public static final byte SESS_XA_SET_TIMEOUT = 63;
+
+   public static final byte SESS_XA_SET_TIMEOUT_RESP = 64;
+
+   public static final byte SESS_XA_GET_TIMEOUT = 65;
+
+   public static final byte SESS_XA_GET_TIMEOUT_RESP = 66;
+
+   public static final byte SESS_START = 67;
+
+   public static final byte SESS_STOP = 68;
+
+   public static final byte SESS_CLOSE = 69;
+
+   public static final byte SESS_FLOWTOKEN = 70;
+
+   public static final byte SESS_SEND = 71;
+
+   public static final byte SESS_SEND_LARGE = 72;
+
+   public static final byte SESS_SEND_CONTINUATION = 73;
+
+   public static final byte SESS_CONSUMER_CLOSE = 74;
+
+   public static final byte SESS_RECEIVE_MSG = 75;
+
+   public static final byte SESS_RECEIVE_LARGE_MSG = 76;
+
+   public static final byte SESS_RECEIVE_CONTINUATION = 77;
+
+   public static final byte SESS_FORCE_CONSUMER_DELIVERY = 78;
+
+   public static final byte SESS_PRODUCER_REQUEST_CREDITS = 79;
+
+   public static final byte SESS_PRODUCER_CREDITS = 80;
+
+   public static final byte SESS_INDIVIDUAL_ACKNOWLEDGE = 81;
+
+   public static final byte SESS_PRODUCER_FAIL_CREDITS = 82;
+
+   // Replication
+
+   public static final byte REPLICATION_RESPONSE = 90;
+
+   public static final byte REPLICATION_APPEND = 91;
+
+   public static final byte REPLICATION_APPEND_TX = 92;
+
+   public static final byte REPLICATION_DELETE = 93;
+
+   public static final byte REPLICATION_DELETE_TX = 94;
+
+   public static final byte REPLICATION_PREPARE = 95;
+
+   public static final byte REPLICATION_COMMIT_ROLLBACK = 96;
+
+   public static final byte REPLICATION_PAGE_WRITE = 97;
+
+   public static final byte REPLICATION_PAGE_EVENT = 98;
+
+   public static final byte REPLICATION_LARGE_MESSAGE_BEGIN = 99;
+
+   public static final byte REPLICATION_LARGE_MESSAGE_END = 100;
+
+   public static final byte REPLICATION_LARGE_MESSAGE_WRITE = 101;
+
+   /*
+    * code 102 was REPLICATION_COMPARE_DATA, released into production as a message, but as part of
+    * the (then) non-function replication system.
+    */
+
+   public static final byte REPLICATION_SYNC_FILE = 103;
+
+   public static final byte SESS_ADD_METADATA = 104;
+
+   public static final byte SESS_ADD_METADATA2 = 105;
+
+   public static final byte SESS_UNIQUE_ADD_METADATA = 106;
+
+
+
+   // HA
+
+   public static final byte CLUSTER_TOPOLOGY = 110;
+
+   public static final byte NODE_ANNOUNCE = 111;
+
+   public static final byte SUBSCRIBE_TOPOLOGY = 112;
+
+   // For newer versions
+
+   public static final byte SUBSCRIBE_TOPOLOGY_V2 = 113;
+
+   public static final byte CLUSTER_TOPOLOGY_V2 = 114;
+
+   public static final byte BACKUP_REGISTRATION = 115;
+   public static final byte BACKUP_REGISTRATION_FAILED = 116;
+
+   public static final byte REPLICATION_START_FINISH_SYNC = 120;
+   public static final byte REPLICATION_SCHEDULED_FAILOVER = 121;
+
+   public static final byte CLUSTER_TOPOLOGY_V3 = 122;
+
+   //do not reuse
+   //public static final byte NODE_ANNOUNCE_V2 = 123;
+
+   public static final byte DISCONNECT_V2 = 124;
+
+   public static final byte CLUSTER_CONNECT = 125;
+
+   public static final byte CLUSTER_CONNECT_REPLY = 126;
+
+   public static final byte BACKUP_REQUEST = 127;
+
+   //oops ran out of positive bytes
+   public static final byte BACKUP_REQUEST_RESPONSE = -1;
+
+   public static final byte QUORUM_VOTE = -2;
+
+   public static final byte QUORUM_VOTE_REPLY = -3;
+
+   public static final byte CHECK_FOR_FAILOVER = -4;
+
+   public static final byte CHECK_FOR_FAILOVER_REPLY = -5;
+
+   public static final byte SCALEDOWN_ANNOUNCEMENT = -6;
+
+   // Static --------------------------------------------------------
+
+   public PacketImpl(final byte type)
+   {
+      this.type = type;
+   }
+
+   // Public --------------------------------------------------------
+
+   public byte getType()
+   {
+      return type;
+   }
+
+   public long getChannelID()
+   {
+      return channelID;
+   }
+
+   public void setChannelID(final long channelID)
+   {
+      this.channelID = channelID;
+   }
+
+   public HornetQBuffer encode(final RemotingConnection connection)
+   {
+      HornetQBuffer buffer = connection.createBuffer(PacketImpl.INITIAL_PACKET_SIZE);
+
+      // The standard header fields
+
+      buffer.writeInt(0); // The length gets filled in at the end
+      buffer.writeByte(type);
+      buffer.writeLong(channelID);
+
+      encodeRest(buffer);
+
+      size = buffer.writerIndex();
+
+      // The length doesn't include the actual length byte
+      int len = size - DataConstants.SIZE_INT;
+
+      buffer.setInt(0, len);
+
+      return buffer;
+   }
+
+   public void decode(final HornetQBuffer buffer)
+   {
+      channelID = buffer.readLong();
+
+      decodeRest(buffer);
+
+      size = buffer.readerIndex();
+   }
+
+   public int getPacketSize()
+   {
+      if (size == -1)
+      {
+         throw new IllegalStateException("Packet hasn't been encoded/decoded yet");
+      }
+
+      return size;
+   }
+
+   public boolean isResponse()
+   {
+      return false;
+   }
+
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+   }
+
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+   }
+
+   public boolean isRequiresConfirmations()
+   {
+      return true;
+   }
+
+   public boolean isAsyncExec()
+   {
+      return false;
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + "]";
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + (int)(channelID ^ (channelID >>> 32));
+      result = prime * result + size;
+      result = prime * result + type;
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+      {
+         return true;
+      }
+      if (!(obj instanceof PacketImpl))
+      {
+         return false;
+      }
+      PacketImpl other = (PacketImpl)obj;
+      return (channelID == other.channelID) && (size == other.size) && (type != other.type);
+   }
+
+   protected String getParentString()
+   {
+      return "PACKET("  + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName();
+   }
+
+   private int stringEncodeSize(final String str)
+   {
+      return DataConstants.SIZE_INT + str.length() * 2;
+   }
+
+   protected int nullableStringEncodeSize(final String str)
+   {
+      return DataConstants.SIZE_BOOLEAN + (str != null ? stringEncodeSize(str) : 0);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/RemotingConnectionImpl.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/RemotingConnectionImpl.java
new file mode 100644
index 0000000..f30f073
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -0,0 +1,470 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.Interceptor;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.core.client.HornetQClientLogger;
+import org.apache.activemq6.core.protocol.core.Channel;
+import org.apache.activemq6.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq6.core.protocol.core.Packet;
+import org.apache.activemq6.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectMessage;
+import org.apache.activemq6.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
+import org.apache.activemq6.core.security.HornetQPrincipal;
+import org.apache.activemq6.spi.core.protocol.AbstractRemotingConnection;
+import org.apache.activemq6.spi.core.remoting.Connection;
+import org.apache.activemq6.utils.SimpleIDGenerator;
+
+/**
+ * @author <a href="tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection
+{
+   // Constants
+   // ------------------------------------------------------------------------------------
+
+   private static final boolean isTrace = HornetQClientLogger.LOGGER.isTraceEnabled();
+
+   // Static
+   // ---------------------------------------------------------------------------------------
+
+   // Attributes
+   // -----------------------------------------------------------------------------------
+   private final PacketDecoder packetDecoder;
+
+   private final Map<Long, Channel> channels = new ConcurrentHashMap<Long, Channel>();
+
+   private final long blockingCallTimeout;
+
+   private final long blockingCallFailoverTimeout;
+
+   private final List<Interceptor> incomingInterceptors;
+
+   private final List<Interceptor> outgoingInterceptors;
+
+   private volatile boolean destroyed;
+
+   private final boolean client;
+
+   private int clientVersion;
+
+   private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(CHANNEL_ID.USER.id);
+
+   private boolean idGeneratorSynced = false;
+
+   private final Object transferLock = new Object();
+
+   private final Object failLock = new Object();
+
+   private volatile boolean executing;
+
+   private final SimpleString nodeID;
+
+   private String clientID;
+
+   // Constructors
+   // ---------------------------------------------------------------------------------
+
+   /*
+    * Create a client side connection
+    */
+   public RemotingConnectionImpl(final PacketDecoder packetDecoder,
+                                 final Connection transportConnection,
+                                 final long blockingCallTimeout,
+                                 final long blockingCallFailoverTimeout,
+                                 final List<Interceptor> incomingInterceptors,
+                                 final List<Interceptor> outgoingInterceptors)
+   {
+      this(packetDecoder, transportConnection, blockingCallTimeout, blockingCallFailoverTimeout, incomingInterceptors, outgoingInterceptors, true, null, null);
+   }
+
+   /*
+    * Create a server side connection
+    */
+   RemotingConnectionImpl(final PacketDecoder packetDecoder,
+                          final Connection transportConnection,
+                          final List<Interceptor> incomingInterceptors,
+                          final List<Interceptor> outgoingInterceptors,
+                          final Executor executor,
+                          final SimpleString nodeID)
+
+   {
+      this(packetDecoder, transportConnection, -1, -1, incomingInterceptors, outgoingInterceptors, false, executor, nodeID);
+   }
+
+   private RemotingConnectionImpl(final PacketDecoder packetDecoder,
+                                  final Connection transportConnection,
+                                  final long blockingCallTimeout,
+                                  final long blockingCallFailoverTimeout,
+                                  final List<Interceptor> incomingInterceptors,
+                                  final List<Interceptor> outgoingInterceptors,
+                                  final boolean client,
+                                  final Executor executor,
+                                  final SimpleString nodeID)
+
+   {
+      super(transportConnection, executor);
+
+      this.packetDecoder = packetDecoder;
+
+      this.blockingCallTimeout = blockingCallTimeout;
+
+      this.blockingCallFailoverTimeout = blockingCallFailoverTimeout;
+
+      this.incomingInterceptors = incomingInterceptors;
+
+      this.outgoingInterceptors = outgoingInterceptors;
+
+      this.client = client;
+
+      this.nodeID = nodeID;
+
+      transportConnection.setProtocolConnection(this);
+   }
+
+
+   // RemotingConnection implementation
+   // ------------------------------------------------------------
+
+   @Override
+   public String toString()
+   {
+      return "RemotingConnectionImpl [clientID=" + clientID +
+         ", nodeID=" +
+         nodeID +
+         ", transportConnection=" +
+         getTransportConnection() +
+         "]";
+   }
+
+   /**
+    * @return the clientVersion
+    */
+   public int getClientVersion()
+   {
+      return clientVersion;
+   }
+
+   /**
+    * @param clientVersion the clientVersion to set
+    */
+   public void setClientVersion(int clientVersion)
+   {
+      this.clientVersion = clientVersion;
+   }
+
+   public synchronized Channel getChannel(final long channelID, final int confWindowSize)
+   {
+      Channel channel = channels.get(channelID);
+
+      if (channel == null)
+      {
+         channel = new ChannelImpl(this, channelID, confWindowSize, outgoingInterceptors);
+
+         channels.put(channelID, channel);
+      }
+
+      return channel;
+   }
+
+   public synchronized boolean removeChannel(final long channelID)
+   {
+      return channels.remove(channelID) != null;
+   }
+
+   public synchronized void putChannel(final long channelID, final Channel channel)
+   {
+      channels.put(channelID, channel);
+   }
+
+   public void fail(final HornetQException me, String scaleDownTargetNodeID)
+   {
+      synchronized (failLock)
+      {
+         if (destroyed)
+         {
+            return;
+         }
+
+         destroyed = true;
+      }
+
+      HornetQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
+
+
+      try
+      {
+         transportConnection.forceClose();
+      }
+      catch (Throwable e)
+      {
+         HornetQClientLogger.LOGGER.warn(e.getMessage(), e);
+      }
+
+      // Then call the listeners
+      callFailureListeners(me, scaleDownTargetNodeID);
+
+      callClosingListeners();
+
+      internalClose();
+
+      for (Channel channel : channels.values())
+      {
+         channel.returnBlocking(me);
+      }
+   }
+
+   public void destroy()
+   {
+      synchronized (failLock)
+      {
+         if (destroyed)
+         {
+            return;
+         }
+
+         destroyed = true;
+      }
+
+      internalClose();
+
+      callClosingListeners();
+   }
+
+   public void disconnect(final boolean criticalError)
+   {
+      disconnect(null, criticalError);
+   }
+
+   public void disconnect(String scaleDownNodeID, final boolean criticalError)
+   {
+      Channel channel0 = getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1);
+
+      // And we remove all channels from the connection, this ensures no more packets will be processed after this
+      // method is
+      // complete
+
+      Set<Channel> allChannels = new HashSet<Channel>(channels.values());
+
+      if (!criticalError)
+      {
+         removeAllChannels();
+      }
+      else
+      {
+         // We can't hold a lock if a critical error is happening...
+         // as other threads will be holding the lock while hanging on IO
+         channels.clear();
+      }
+
+      // Now we are 100% sure that no more packets will be processed we can flush then send the disconnect
+
+      if (!criticalError)
+      {
+         for (Channel channel : allChannels)
+         {
+            channel.flushConfirmations();
+         }
+      }
+      Packet disconnect;
+
+      if (channel0.supports(PacketImpl.DISCONNECT_V2))
+      {
+         disconnect = new DisconnectMessage_V2(nodeID, scaleDownNodeID);
+      }
+      else
+      {
+         disconnect = new DisconnectMessage(nodeID);
+      }
+      channel0.sendAndFlush(disconnect);
+   }
+
+   public long generateChannelID()
+   {
+      return idGenerator.generateID();
+   }
+
+   public synchronized void syncIDGeneratorSequence(final long id)
+   {
+      if (!idGeneratorSynced)
+      {
+         idGenerator = new SimpleIDGenerator(id);
+
+         idGeneratorSynced = true;
+      }
+   }
+
+   public long getIDGeneratorSequence()
+   {
+      return idGenerator.getCurrentID();
+   }
+
+   public Object getTransferLock()
+   {
+      return transferLock;
+   }
+
+   public boolean isClient()
+   {
+      return client;
+   }
+
+   public boolean isDestroyed()
+   {
+      return destroyed;
+   }
+
+   public long getBlockingCallTimeout()
+   {
+      return blockingCallTimeout;
+   }
+
+   @Override
+   public long getBlockingCallFailoverTimeout()
+   {
+      return blockingCallFailoverTimeout;
+   }
+
+   //We flush any confirmations on the connection - this prevents idle bridges for example
+   //sitting there with many unacked messages
+   public void flush()
+   {
+      synchronized (transferLock)
+      {
+         for (Channel channel : channels.values())
+         {
+            channel.flushConfirmations();
+         }
+      }
+   }
+
+   public HornetQPrincipal getDefaultHornetQPrincipal()
+   {
+      return getTransportConnection().getDefaultHornetQPrincipal();
+   }
+
+   // Buffer Handler implementation
+   // ----------------------------------------------------
+   public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+   {
+      try
+      {
+         final Packet packet = packetDecoder.decode(buffer);
+
+         if (isTrace)
+         {
+            HornetQClientLogger.LOGGER.trace("handling packet " + packet);
+         }
+
+         if (packet.isAsyncExec() && executor != null)
+         {
+            executing = true;
+
+            executor.execute(new Runnable()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     doBufferReceived(packet);
+                  }
+                  catch (Throwable t)
+                  {
+                     HornetQClientLogger.LOGGER.errorHandlingPacket(t, packet);
+                  }
+
+                  executing = false;
+               }
+            });
+         }
+         else
+         {
+            //To prevent out of order execution if interleaving sync and async operations on same connection
+            while (executing)
+            {
+               Thread.yield();
+            }
+
+            // Pings must always be handled out of band so we can send pings back to the client quickly
+            // otherwise they would get in the queue with everything else which might give an intolerable delay
+            doBufferReceived(packet);
+         }
+
+         super.bufferReceived(connectionID, buffer);
+      }
+      catch (Exception e)
+      {
+         HornetQClientLogger.LOGGER.errorDecodingPacket(e);
+      }
+   }
+
+   private void doBufferReceived(final Packet packet)
+   {
+      if (ChannelImpl.invokeInterceptors(packet, incomingInterceptors, this) != null)
+      {
+         return;
+      }
+
+      synchronized (transferLock)
+      {
+         final Channel channel = channels.get(packet.getChannelID());
+
+         if (channel != null)
+         {
+            channel.handlePacket(packet);
+         }
+      }
+   }
+
+   protected void removeAllChannels()
+   {
+      // We get the transfer lock first - this ensures no packets are being processed AND
+      // it's guaranteed no more packets will be processed once this method is complete
+      synchronized (transferLock)
+      {
+         channels.clear();
+      }
+   }
+
+   private void internalClose()
+   {
+      // We close the underlying transport connection
+      getTransportConnection().close();
+
+      for (Channel channel : channels.values())
+      {
+         channel.close();
+      }
+   }
+
+   public void setClientID(String cID)
+   {
+      clientID = cID;
+   }
+
+   public String getClientID()
+   {
+      return clientID;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverMessage.java
new file mode 100644
index 0000000..ab1ec8c
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.protocol.core.impl.PacketImpl;
+
+public class CheckFailoverMessage extends PacketImpl
+{
+   private String nodeID;
+
+   public CheckFailoverMessage(final String nodeID)
+   {
+      super(CHECK_FOR_FAILOVER);
+      this.nodeID = nodeID;
+   }
+
+   public CheckFailoverMessage()
+   {
+      super(CHECK_FOR_FAILOVER);
+   }
+
+   @Override
+   public void encodeRest(HornetQBuffer buffer)
+   {
+      buffer.writeNullableString(nodeID);
+   }
+
+   @Override
+   public void decodeRest(HornetQBuffer buffer)
+   {
+      nodeID = buffer.readNullableString();
+   }
+
+   public String getNodeID()
+   {
+      return nodeID;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverReplyMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverReplyMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverReplyMessage.java
new file mode 100644
index 0000000..d2fbc49
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CheckFailoverReplyMessage.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.protocol.core.impl.PacketImpl;
+
+public class CheckFailoverReplyMessage extends PacketImpl
+{
+   private boolean okToFailover;
+
+   public CheckFailoverReplyMessage(boolean okToFailover)
+   {
+      super(CHECK_FOR_FAILOVER_REPLY);
+      this.okToFailover = okToFailover;
+   }
+
+   public CheckFailoverReplyMessage()
+   {
+      super(CHECK_FOR_FAILOVER_REPLY);
+   }
+
+   @Override
+   public boolean isResponse()
+   {
+      return true;
+   }
+
+   @Override
+   public void encodeRest(HornetQBuffer buffer)
+   {
+      buffer.writeBoolean(okToFailover);
+   }
+
+   @Override
+   public void decodeRest(HornetQBuffer buffer)
+   {
+      okToFailover = buffer.readBoolean();
+   }
+
+   public boolean isOkToFailover()
+   {
+      return okToFailover;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
new file mode 100644
index 0000000..0c3a998
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.api.core.TransportConfiguration;
+import org.apache.activemq6.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ */
+public class ClusterTopologyChangeMessage extends PacketImpl
+{
+   protected boolean exit;
+
+   protected String nodeID;
+
+   protected Pair<TransportConfiguration, TransportConfiguration> pair;
+
+   protected boolean last;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ClusterTopologyChangeMessage(final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last)
+   {
+      super(CLUSTER_TOPOLOGY);
+
+      this.nodeID = nodeID;
+
+      this.pair = pair;
+
+      this.last = last;
+
+      this.exit = false;
+   }
+
+   public ClusterTopologyChangeMessage(final String nodeID)
+   {
+      super(CLUSTER_TOPOLOGY);
+
+      this.exit = true;
+
+      this.nodeID = nodeID;
+   }
+
+   public ClusterTopologyChangeMessage()
+   {
+      super(CLUSTER_TOPOLOGY);
+   }
+
+   // Public --------------------------------------------------------
+
+   /**
+    * @param clusterTopologyV2
+    */
+   public ClusterTopologyChangeMessage(byte clusterTopologyV2)
+   {
+      super(clusterTopologyV2);
+   }
+
+   public String getNodeID()
+   {
+      return nodeID;
+   }
+
+   public Pair<TransportConfiguration, TransportConfiguration> getPair()
+   {
+      return pair;
+   }
+
+   public boolean isLast()
+   {
+      return last;
+   }
+
+   public boolean isExit()
+   {
+      return exit;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeBoolean(exit);
+      buffer.writeString(nodeID);
+      if (!exit)
+      {
+         if (pair.getA() != null)
+         {
+            buffer.writeBoolean(true);
+            pair.getA().encode(buffer);
+         }
+         else
+         {
+            buffer.writeBoolean(false);
+         }
+         if (pair.getB() != null)
+         {
+            buffer.writeBoolean(true);
+            pair.getB().encode(buffer);
+         }
+         else
+         {
+            buffer.writeBoolean(false);
+         }
+         buffer.writeBoolean(last);
+      }
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      exit = buffer.readBoolean();
+      nodeID = buffer.readString();
+      if (!exit)
+      {
+         boolean hasLive = buffer.readBoolean();
+         TransportConfiguration a;
+         if (hasLive)
+         {
+            a = new TransportConfiguration();
+            a.decode(buffer);
+         }
+         else
+         {
+            a = null;
+         }
+         boolean hasBackup = buffer.readBoolean();
+         TransportConfiguration b;
+         if (hasBackup)
+         {
+            b = new TransportConfiguration();
+            b.decode(buffer);
+         }
+         else
+         {
+            b = null;
+         }
+         pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
+         last = buffer.readBoolean();
+      }
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + (exit ? 1231 : 1237);
+      result = prime * result + (last ? 1231 : 1237);
+      result = prime * result + ((nodeID == null) ? 0 : nodeID.hashCode());
+      result = prime * result + ((pair == null) ? 0 : pair.hashCode());
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+      {
+         return true;
+      }
+      if (!super.equals(obj))
+      {
+         return false;
+      }
+      if (!(obj instanceof ClusterTopologyChangeMessage))
+      {
+         return false;
+      }
+      ClusterTopologyChangeMessage other = (ClusterTopologyChangeMessage) obj;
+      if (exit != other.exit)
+      {
+         return false;
+      }
+      if (last != other.last)
+      {
+         return false;
+      }
+      if (nodeID == null)
+      {
+         if (other.nodeID != null)
+         {
+            return false;
+         }
+      }
+      else if (!nodeID.equals(other.nodeID))
+      {
+         return false;
+      }
+      if (pair == null)
+      {
+         if (other.pair != null)
+         {
+            return false;
+         }
+      }
+      else if (!pair.equals(other.pair))
+      {
+         return false;
+      }
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java
new file mode 100644
index 0000000..5a1eeb9
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V2.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.api.core.TransportConfiguration;
+
+/**
+ * Clebert Suconic
+ */
+public class ClusterTopologyChangeMessage_V2 extends ClusterTopologyChangeMessage
+{
+   protected long uniqueEventID;
+   protected String backupGroupName;
+
+   public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID, final String backupGroupName,
+                                          final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last)
+   {
+      super(CLUSTER_TOPOLOGY_V2);
+
+      this.nodeID = nodeID;
+
+      this.pair = pair;
+
+      this.last = last;
+
+      this.exit = false;
+
+      this.uniqueEventID = uniqueEventID;
+
+      this.backupGroupName = backupGroupName;
+   }
+
+   public ClusterTopologyChangeMessage_V2(final long uniqueEventID, final String nodeID)
+   {
+      super(CLUSTER_TOPOLOGY_V2);
+
+      this.exit = true;
+
+      this.nodeID = nodeID;
+
+      this.uniqueEventID = uniqueEventID;
+   }
+
+   public ClusterTopologyChangeMessage_V2()
+   {
+      super(CLUSTER_TOPOLOGY_V2);
+   }
+
+   public ClusterTopologyChangeMessage_V2(byte clusterTopologyV3)
+   {
+      super(clusterTopologyV3);
+   }
+
+   /**
+    * @return the uniqueEventID
+    */
+   public long getUniqueEventID()
+   {
+      return uniqueEventID;
+   }
+
+   public String getBackupGroupName()
+   {
+      return backupGroupName;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeBoolean(exit);
+      buffer.writeString(nodeID);
+      buffer.writeLong(uniqueEventID);
+      if (!exit)
+      {
+         if (pair.getA() != null)
+         {
+            buffer.writeBoolean(true);
+            pair.getA().encode(buffer);
+         }
+         else
+         {
+            buffer.writeBoolean(false);
+         }
+         if (pair.getB() != null)
+         {
+            buffer.writeBoolean(true);
+            pair.getB().encode(buffer);
+         }
+         else
+         {
+            buffer.writeBoolean(false);
+         }
+         buffer.writeBoolean(last);
+      }
+      buffer.writeNullableString(backupGroupName);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      exit = buffer.readBoolean();
+      nodeID = buffer.readString();
+      uniqueEventID = buffer.readLong();
+      if (!exit)
+      {
+         boolean hasLive = buffer.readBoolean();
+         TransportConfiguration a;
+         if (hasLive)
+         {
+            a = new TransportConfiguration();
+            a.decode(buffer);
+         }
+         else
+         {
+            a = null;
+         }
+         boolean hasBackup = buffer.readBoolean();
+         TransportConfiguration b;
+         if (hasBackup)
+         {
+            b = new TransportConfiguration();
+            b.decode(buffer);
+         }
+         else
+         {
+            b = null;
+         }
+         pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
+         last = buffer.readBoolean();
+      }
+      if (buffer.readableBytes() > 0)
+      {
+         backupGroupName = buffer.readNullableString();
+      }
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + ((backupGroupName == null) ? 0 : backupGroupName.hashCode());
+      result = prime * result + (int) (uniqueEventID ^ (uniqueEventID >>> 32));
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+      {
+         return true;
+      }
+      if (!super.equals(obj))
+      {
+         return false;
+      }
+      if (!(obj instanceof ClusterTopologyChangeMessage_V2))
+      {
+         return false;
+      }
+      ClusterTopologyChangeMessage_V2 other = (ClusterTopologyChangeMessage_V2) obj;
+      if (uniqueEventID != other.uniqueEventID)
+      {
+         return false;
+      }
+      if (backupGroupName == null)
+      {
+         if (other.backupGroupName != null)
+         {
+            return false;
+         }
+      }
+      else if (!backupGroupName.equals(other.backupGroupName))
+      {
+         return false;
+      }
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V3.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V3.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V3.java
new file mode 100644
index 0000000..7a78e6c
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage_V3.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.api.core.TransportConfiguration;
+
+/**
+ * @author Justin Bertram
+ */
+public class ClusterTopologyChangeMessage_V3 extends ClusterTopologyChangeMessage_V2
+{
+   private String scaleDownGroupName;
+
+   public ClusterTopologyChangeMessage_V3(final long uniqueEventID, final String nodeID, final String backupGroupName, final String scaleDownGroupName,
+                                          final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last)
+   {
+      super(CLUSTER_TOPOLOGY_V3);
+
+      this.nodeID = nodeID;
+
+      this.pair = pair;
+
+      this.last = last;
+
+      this.exit = false;
+
+      this.uniqueEventID = uniqueEventID;
+
+      this.backupGroupName = backupGroupName;
+
+      this.scaleDownGroupName = scaleDownGroupName;
+   }
+
+   public ClusterTopologyChangeMessage_V3()
+   {
+      super(CLUSTER_TOPOLOGY_V3);
+   }
+
+   public String getScaleDownGroupName()
+   {
+      return scaleDownGroupName;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      super.encodeRest(buffer);
+      buffer.writeNullableString(scaleDownGroupName);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      super.decodeRest(buffer);
+      scaleDownGroupName = buffer.readNullableString();
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + ((scaleDownGroupName == null) ? 0 : scaleDownGroupName.hashCode());
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+      {
+         return true;
+      }
+      if (!super.equals(obj))
+      {
+         return false;
+      }
+      if (!(obj instanceof ClusterTopologyChangeMessage_V3))
+      {
+         return false;
+      }
+      ClusterTopologyChangeMessage_V3 other = (ClusterTopologyChangeMessage_V3) obj;
+      if (scaleDownGroupName == null)
+      {
+         if (other.scaleDownGroupName != null)
+         {
+            return false;
+         }
+      }
+      else if (!scaleDownGroupName.equals(other.scaleDownGroupName))
+      {
+         return false;
+      }
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateQueueMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateQueueMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateQueueMessage.java
new file mode 100644
index 0000000..2be3e02
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateQueueMessage.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+
+ */
+public class CreateQueueMessage extends PacketImpl
+{
+
+   private SimpleString address;
+
+   private SimpleString queueName;
+
+   private SimpleString filterString;
+
+   private boolean durable;
+
+   private boolean temporary;
+
+   private boolean requiresResponse;
+
+   public CreateQueueMessage(final SimpleString address,
+                             final SimpleString queueName,
+                             final SimpleString filterString,
+                             final boolean durable,
+                             final boolean temporary,
+                             final boolean requiresResponse)
+   {
+      this();
+
+      this.address = address;
+      this.queueName = queueName;
+      this.filterString = filterString;
+      this.durable = durable;
+      this.temporary = temporary;
+      this.requiresResponse = requiresResponse;
+   }
+
+   public CreateQueueMessage()
+   {
+      super(CREATE_QUEUE);
+   }
+
+   // Public --------------------------------------------------------
+
+   @Override
+   public String toString()
+   {
+      StringBuffer buff = new StringBuffer(getParentString());
+      buff.append(", address=" + address);
+      buff.append(", queueName=" + queueName);
+      buff.append(", filterString=" + filterString);
+      buff.append(", durable=" + durable);
+      buff.append(", temporary=" + temporary);
+      buff.append("]");
+      return buff.toString();
+   }
+
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+
+   public SimpleString getQueueName()
+   {
+      return queueName;
+   }
+
+   public SimpleString getFilterString()
+   {
+      return filterString;
+   }
+
+   public boolean isDurable()
+   {
+      return durable;
+   }
+
+   public boolean isTemporary()
+   {
+      return temporary;
+   }
+
+   public boolean isRequiresResponse()
+   {
+      return requiresResponse;
+   }
+
+   public void setAddress(SimpleString address)
+   {
+      this.address = address;
+   }
+
+   public void setQueueName(SimpleString queueName)
+   {
+      this.queueName = queueName;
+   }
+
+   public void setFilterString(SimpleString filterString)
+   {
+      this.filterString = filterString;
+   }
+
+   public void setDurable(boolean durable)
+   {
+      this.durable = durable;
+   }
+
+   public void setTemporary(boolean temporary)
+   {
+      this.temporary = temporary;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeSimpleString(address);
+      buffer.writeSimpleString(queueName);
+      buffer.writeNullableSimpleString(filterString);
+      buffer.writeBoolean(durable);
+      buffer.writeBoolean(temporary);
+      buffer.writeBoolean(requiresResponse);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      address = buffer.readSimpleString();
+      queueName = buffer.readSimpleString();
+      filterString = buffer.readNullableSimpleString();
+      durable = buffer.readBoolean();
+      temporary = buffer.readBoolean();
+      requiresResponse = buffer.readBoolean();
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + ((address == null) ? 0 : address.hashCode());
+      result = prime * result + (durable ? 1231 : 1237);
+      result = prime * result + ((filterString == null) ? 0 : filterString.hashCode());
+      result = prime * result + ((queueName == null) ? 0 : queueName.hashCode());
+      result = prime * result + (requiresResponse ? 1231 : 1237);
+      result = prime * result + (temporary ? 1231 : 1237);
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (!super.equals(obj))
+         return false;
+      if (!(obj instanceof CreateQueueMessage))
+         return false;
+      CreateQueueMessage other = (CreateQueueMessage)obj;
+      if (address == null)
+      {
+         if (other.address != null)
+            return false;
+      }
+      else if (!address.equals(other.address))
+         return false;
+      if (durable != other.durable)
+         return false;
+      if (filterString == null)
+      {
+         if (other.filterString != null)
+            return false;
+      }
+      else if (!filterString.equals(other.filterString))
+         return false;
+      if (queueName == null)
+      {
+         if (other.queueName != null)
+            return false;
+      }
+      else if (!queueName.equals(other.queueName))
+         return false;
+      if (requiresResponse != other.requiresResponse)
+         return false;
+      if (temporary != other.temporary)
+         return false;
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionMessage.java
new file mode 100644
index 0000000..ad871b3
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionMessage.java
@@ -0,0 +1,272 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>.
+ *
+ */
+public class CreateSessionMessage extends PacketImpl
+{
+   private String name;
+
+   private long sessionChannelID;
+
+   private int version;
+
+   private String username;
+
+   private String password;
+
+   private int minLargeMessageSize;
+
+   private boolean xa;
+
+   private boolean autoCommitSends;
+
+   private boolean autoCommitAcks;
+
+   private boolean preAcknowledge;
+
+   private int windowSize;
+
+   private String defaultAddress;
+
+   public CreateSessionMessage(final String name,
+                               final long sessionChannelID,
+                               final int version,
+                               final String username,
+                               final String password,
+                               final int minLargeMessageSize,
+                               final boolean xa,
+                               final boolean autoCommitSends,
+                               final boolean autoCommitAcks,
+                               final boolean preAcknowledge,
+                               final int windowSize,
+                               final String defaultAddress)
+   {
+      super(CREATESESSION);
+
+      this.name = name;
+
+      this.sessionChannelID = sessionChannelID;
+
+      this.version = version;
+
+      this.username = username;
+
+      this.password = password;
+
+      this.minLargeMessageSize = minLargeMessageSize;
+
+      this.xa = xa;
+
+      this.autoCommitSends = autoCommitSends;
+
+      this.autoCommitAcks = autoCommitAcks;
+
+      this.windowSize = windowSize;
+
+      this.preAcknowledge = preAcknowledge;
+
+      this.defaultAddress = defaultAddress;
+   }
+
+   public CreateSessionMessage()
+   {
+      super(CREATESESSION);
+   }
+
+   // Public --------------------------------------------------------
+
+   public String getName()
+   {
+      return name;
+   }
+
+   public long getSessionChannelID()
+   {
+      return sessionChannelID;
+   }
+
+   public int getVersion()
+   {
+      return version;
+   }
+
+   public String getUsername()
+   {
+      return username;
+   }
+
+   public String getPassword()
+   {
+      return password;
+   }
+
+   public boolean isXA()
+   {
+      return xa;
+   }
+
+   public boolean isAutoCommitSends()
+   {
+      return autoCommitSends;
+   }
+
+   public boolean isAutoCommitAcks()
+   {
+      return autoCommitAcks;
+   }
+
+   public boolean isPreAcknowledge()
+   {
+      return preAcknowledge;
+   }
+
+   public int getWindowSize()
+   {
+      return windowSize;
+   }
+
+   public String getDefaultAddress()
+   {
+      return defaultAddress;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeString(name);
+      buffer.writeLong(sessionChannelID);
+      buffer.writeInt(version);
+      buffer.writeNullableString(username);
+      buffer.writeNullableString(password);
+      buffer.writeInt(minLargeMessageSize);
+      buffer.writeBoolean(xa);
+      buffer.writeBoolean(autoCommitSends);
+      buffer.writeBoolean(autoCommitAcks);
+      buffer.writeInt(windowSize);
+      buffer.writeBoolean(preAcknowledge);
+      buffer.writeNullableString(defaultAddress);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      name = buffer.readString();
+      sessionChannelID = buffer.readLong();
+      version = buffer.readInt();
+      username = buffer.readNullableString();
+      password = buffer.readNullableString();
+      minLargeMessageSize = buffer.readInt();
+      xa = buffer.readBoolean();
+      autoCommitSends = buffer.readBoolean();
+      autoCommitAcks = buffer.readBoolean();
+      windowSize = buffer.readInt();
+      preAcknowledge = buffer.readBoolean();
+      defaultAddress = buffer.readNullableString();
+   }
+
+   @Override
+   public final boolean isRequiresConfirmations()
+   {
+      return false;
+   }
+
+   public int getMinLargeMessageSize()
+   {
+      return minLargeMessageSize;
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + (autoCommitAcks ? 1231 : 1237);
+      result = prime * result + (autoCommitSends ? 1231 : 1237);
+      result = prime * result + ((defaultAddress == null) ? 0 : defaultAddress.hashCode());
+      result = prime * result + minLargeMessageSize;
+      result = prime * result + ((name == null) ? 0 : name.hashCode());
+      result = prime * result + ((password == null) ? 0 : password.hashCode());
+      result = prime * result + (preAcknowledge ? 1231 : 1237);
+      result = prime * result + (int)(sessionChannelID ^ (sessionChannelID >>> 32));
+      result = prime * result + ((username == null) ? 0 : username.hashCode());
+      result = prime * result + version;
+      result = prime * result + windowSize;
+      result = prime * result + (xa ? 1231 : 1237);
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (!super.equals(obj))
+         return false;
+      if (!(obj instanceof CreateSessionMessage))
+         return false;
+      CreateSessionMessage other = (CreateSessionMessage)obj;
+      if (autoCommitAcks != other.autoCommitAcks)
+         return false;
+      if (autoCommitSends != other.autoCommitSends)
+         return false;
+      if (defaultAddress == null)
+      {
+         if (other.defaultAddress != null)
+            return false;
+      }
+      else if (!defaultAddress.equals(other.defaultAddress))
+         return false;
+      if (minLargeMessageSize != other.minLargeMessageSize)
+         return false;
+      if (name == null)
+      {
+         if (other.name != null)
+            return false;
+      }
+      else if (!name.equals(other.name))
+         return false;
+      if (password == null)
+      {
+         if (other.password != null)
+            return false;
+      }
+      else if (!password.equals(other.password))
+         return false;
+      if (preAcknowledge != other.preAcknowledge)
+         return false;
+      if (sessionChannelID != other.sessionChannelID)
+         return false;
+      if (username == null)
+      {
+         if (other.username != null)
+            return false;
+      }
+      else if (!username.equals(other.username))
+         return false;
+      if (version != other.version)
+         return false;
+      if (windowSize != other.windowSize)
+         return false;
+      if (xa != other.xa)
+         return false;
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionResponseMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionResponseMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionResponseMessage.java
new file mode 100644
index 0000000..76a5f54
--- /dev/null
+++ b/activemq6-core-client/src/main/java/org/apache/activemq6/core/protocol/core/impl/wireformat/CreateSessionResponseMessage.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>.
+ *
+ */
+public class CreateSessionResponseMessage extends PacketImpl
+{
+   private int serverVersion;
+
+   public CreateSessionResponseMessage(final int serverVersion)
+   {
+      super(CREATESESSION_RESP);
+
+      this.serverVersion = serverVersion;
+   }
+
+   public CreateSessionResponseMessage()
+   {
+      super(CREATESESSION_RESP);
+   }
+
+   @Override
+   public boolean isResponse()
+   {
+      return true;
+   }
+
+   public int getServerVersion()
+   {
+      return serverVersion;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeInt(serverVersion);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      serverVersion = buffer.readInt();
+   }
+
+   @Override
+   public final boolean isRequiresConfirmations()
+   {
+      return false;
+   }
+
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + serverVersion;
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (!super.equals(obj))
+         return false;
+      if (!(obj instanceof CreateSessionResponseMessage))
+         return false;
+      CreateSessionResponseMessage other = (CreateSessionResponseMessage)obj;
+      if (serverVersion != other.serverVersion)
+         return false;
+      return true;
+   }
+}


Mime
View raw message