activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [45/52] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 Rename HornetQ* classes to ActiveMQ*
Date Tue, 18 Nov 2014 23:38:38 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java
new file mode 100644
index 0000000..4e3d34c
--- /dev/null
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -0,0 +1,940 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.core.protocol.core.impl;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import org.apache.activemq.api.core.ActiveMQBuffer;
+import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.ActiveMQExceptionType;
+import org.apache.activemq.api.core.Message;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.client.ClientConsumer;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.core.client.ActiveMQClientLogger;
+import org.apache.activemq.core.client.ActiveMQClientMessageBundle;
+import org.apache.activemq.core.client.impl.AddressQueryImpl;
+import org.apache.activemq.core.client.impl.ClientConsumerImpl;
+import org.apache.activemq.core.client.impl.ClientConsumerInternal;
+import org.apache.activemq.core.client.impl.ClientLargeMessageInternal;
+import org.apache.activemq.core.client.impl.ClientMessageInternal;
+import org.apache.activemq.core.client.impl.ClientProducerCreditsImpl;
+import org.apache.activemq.core.client.impl.ClientSessionImpl;
+import org.apache.activemq.core.message.impl.MessageInternal;
+import org.apache.activemq.core.protocol.core.Channel;
+import org.apache.activemq.core.protocol.core.ChannelHandler;
+import org.apache.activemq.core.protocol.core.CommandConfirmationHandler;
+import org.apache.activemq.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq.core.protocol.core.Packet;
+import org.apache.activemq.core.protocol.core.impl.wireformat.CreateQueueMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.CreateSessionMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.ReattachSessionMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.RollbackMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCloseMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionExpireMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAStartMessage;
+import org.apache.activemq.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.spi.core.remoting.Connection;
+import org.apache.activemq.spi.core.remoting.SessionContext;
+import org.apache.activemq.utils.TokenBucketLimiterImpl;
+
+import static org.apache.activemq.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
+import static org.apache.activemq.core.protocol.core.impl.PacketImpl.EXCEPTION;
+import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
+import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
+import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ActiveMQSessionContext extends SessionContext
+{
+   private final Channel sessionChannel;
+   private final int serverVersion;
+   private int confirmationWindow;
+   private final String name;
+
+
+   public ActiveMQSessionContext(String name, RemotingConnection remotingConnection, Channel sessionChannel, int serverVersion, int confirmationWindow)
+   {
+      super(remotingConnection);
+
+      this.name = name;
+      this.sessionChannel = sessionChannel;
+      this.serverVersion = serverVersion;
+      this.confirmationWindow = confirmationWindow;
+
+      ChannelHandler handler = new ClientSessionPacketHandler();
+      sessionChannel.setHandler(handler);
+
+
+      if (confirmationWindow >= 0)
+      {
+         sessionChannel.setCommandConfirmationHandler(confirmationHandler);
+      }
+   }
+
+
+   private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler()
+   {
+      public void commandConfirmed(final Packet packet)
+      {
+         if (packet.getType() == PacketImpl.SESS_SEND)
+         {
+            SessionSendMessage ssm = (SessionSendMessage) packet;
+            callSendAck(ssm.getHandler(), ssm.getMessage());
+         }
+         else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION)
+         {
+            SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
+            if (!scm.isContinues())
+            {
+               callSendAck(scm.getHandler(), scm.getMessage());
+            }
+         }
+      }
+
+      private void callSendAck(SendAcknowledgementHandler handler, final Message message)
+      {
+         if (handler != null)
+         {
+            handler.sendAcknowledged(message);
+         }
+         else if (sendAckHandler != null)
+         {
+            sendAckHandler.sendAcknowledged(message);
+         }
+      }
+
+   };
+
+
+   // Failover utility methods
+
+   @Override
+   public void returnBlocking(ActiveMQException cause)
+   {
+      sessionChannel.returnBlocking(cause);
+   }
+
+   @Override
+   public void lockCommunications()
+   {
+      sessionChannel.lock();
+   }
+
+   @Override
+   public void releaseCommunications()
+   {
+      sessionChannel.setTransferring(false);
+      sessionChannel.unlock();
+   }
+
+   public void cleanup()
+   {
+      sessionChannel.close();
+
+      // if the server is sending a disconnect
+      // any pending blocked operation could hang without this
+      sessionChannel.returnBlocking();
+   }
+
+   @Override
+   public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits)
+   {
+      // nothing to be done here... Flow control here is done on the core side
+   }
+
+
+   public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler)
+   {
+      sessionChannel.setCommandConfirmationHandler(confirmationHandler);
+      this.sendAckHandler = handler;
+   }
+
+   public void createSharedQueue(SimpleString address,
+                                 SimpleString queueName,
+                                 SimpleString filterString,
+                                 boolean durable) throws ActiveMQException
+   {
+      sessionChannel.sendBlocking(new CreateSharedQueueMessage(address, queueName, filterString, durable, true), PacketImpl.NULL_RESPONSE);
+   }
+
+   public void deleteQueue(final SimpleString queueName) throws ActiveMQException
+   {
+      sessionChannel.sendBlocking(new SessionDeleteQueueMessage(queueName), PacketImpl.NULL_RESPONSE);
+   }
+
+   public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException
+   {
+      SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
+      SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
+
+      return response.toQueueQuery();
+
+   }
+
+
+   public ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString,
+                                                int windowSize, int maxRate, int ackBatchSize, boolean browseOnly,
+                                                Executor executor, Executor flowControlExecutor) throws ActiveMQException
+   {
+      long consumerID = idGenerator.generateID();
+
+      ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
+
+      SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID,
+                                                                              queueName,
+                                                                              filterString,
+                                                                              browseOnly,
+                                                                              true);
+
+      SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
+
+      // The actual windows size that gets used is determined by the user since
+      // could be overridden on the queue settings
+      // The value we send is just a hint
+
+      return new ClientConsumerImpl(session,
+                                    consumerContext,
+                                    queueName,
+                                    filterString,
+                                    browseOnly,
+                                    calcWindowSize(windowSize),
+                                    ackBatchSize,
+                                    maxRate > 0 ? new TokenBucketLimiterImpl(maxRate,
+                                                                             false)
+                                       : null,
+                                    executor,
+                                    flowControlExecutor,
+                                    this,
+                                    queueInfo.toQueueQuery(),
+                                    lookupTCCL());
+   }
+
+
+   public int getServerVersion()
+   {
+      return serverVersion;
+   }
+
+   public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException
+   {
+      SessionBindingQueryResponseMessage response =
+         (SessionBindingQueryResponseMessage) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
+
+      return new AddressQueryImpl(response.isExists(), response.getQueueNames());
+   }
+
+
+   @Override
+   public void closeConsumer(final ClientConsumer consumer) throws ActiveMQException
+   {
+      sessionChannel.sendBlocking(new SessionConsumerCloseMessage(getConsumerID(consumer)), PacketImpl.NULL_RESPONSE);
+   }
+
+   public void sendConsumerCredits(final ClientConsumer consumer, final int credits)
+   {
+      sessionChannel.send(new SessionConsumerFlowCreditMessage(getConsumerID(consumer), credits));
+   }
+
+   public void forceDelivery(final ClientConsumer consumer, final long sequence) throws ActiveMQException
+   {
+      SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(getConsumerID(consumer), sequence);
+      sessionChannel.send(request);
+   }
+
+   public void simpleCommit() throws ActiveMQException
+   {
+      sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE);
+   }
+
+   public void simpleRollback(boolean lastMessageAsDelivered) throws ActiveMQException
+   {
+      sessionChannel.sendBlocking(new RollbackMessage(lastMessageAsDelivered), PacketImpl.NULL_RESPONSE);
+   }
+
+   public void sessionStart() throws ActiveMQException
+   {
+      sessionChannel.send(new PacketImpl(PacketImpl.SESS_START));
+   }
+
+   public void sessionStop() throws ActiveMQException
+   {
+      sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP), PacketImpl.NULL_RESPONSE);
+   }
+
+   public void addSessionMetadata(String key, String data) throws ActiveMQException
+   {
+      sessionChannel.sendBlocking(new SessionAddMetaDataMessageV2(key, data), PacketImpl.NULL_RESPONSE);
+   }
+
+
+   public void addUniqueMetaData(String key, String data) throws ActiveMQException
+   {
+      sessionChannel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data), PacketImpl.NULL_RESPONSE);
+   }
+
+   public void xaCommit(Xid xid, boolean onePhase) throws XAException, ActiveMQException
+   {
+      SessionXACommitMessage packet = new SessionXACommitMessage(xid, onePhase);
+      SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
+
+      if (response.isError())
+      {
+         throw new XAException(response.getResponseCode());
+      }
+
+      if (ActiveMQClientLogger.LOGGER.isTraceEnabled())
+      {
+         ActiveMQClientLogger.LOGGER.trace("finished commit on " + ClientSessionImpl.convert(xid) + " with response = " + response);
+      }
+   }
+
+   public void xaEnd(Xid xid, int flags) throws XAException, ActiveMQException
+   {
+      Packet packet;
+      if (flags == XAResource.TMSUSPEND)
+      {
+         packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
+      }
+      else if (flags == XAResource.TMSUCCESS)
+      {
+         packet = new SessionXAEndMessage(xid, false);
+      }
+      else if (flags == XAResource.TMFAIL)
+      {
+         packet = new SessionXAEndMessage(xid, true);
+      }
+      else
+      {
+         throw new XAException(XAException.XAER_INVAL);
+      }
+
+      SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
+
+      if (response.isError())
+      {
+         throw new XAException(response.getResponseCode());
+      }
+   }
+
+
+   public void sendProducerCreditsMessage(final int credits, final SimpleString address)
+   {
+      sessionChannel.send(new SessionRequestProducerCreditsMessage(credits, address));
+   }
+
+   /**
+    * ActiveMQ does support large messages
+    *
+    * @return
+    */
+   public boolean supportsLargeMessage()
+   {
+      return true;
+   }
+
+   @Override
+   public int getCreditsOnSendingFull(MessageInternal msgI)
+   {
+      return msgI.getEncodeSize();
+   }
+
+   public void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws ActiveMQException
+   {
+      SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking, handler);
+
+      if (sendBlocking)
+      {
+         sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
+      }
+      else
+      {
+         sessionChannel.sendBatched(packet);
+      }
+   }
+
+   @Override
+   public int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws ActiveMQException
+   {
+      SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(msgI);
+
+      sessionChannel.send(initialChunk);
+
+      return msgI.getHeadersAndPropertiesEncodeSize();
+   }
+
+   @Override
+   public int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException
+   {
+      final boolean requiresResponse = lastChunk && sendBlocking;
+      final SessionSendContinuationMessage chunkPacket =
+         new SessionSendContinuationMessage(msgI, chunk, !lastChunk,
+                                            requiresResponse, messageBodySize, messageHandler);
+
+      if (requiresResponse)
+      {
+         // When sending it blocking, only the last chunk will be blocking.
+         sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+      }
+      else
+      {
+         sessionChannel.send(chunkPacket);
+      }
+
+      return chunkPacket.getPacketSize();
+   }
+
+   public void sendACK(boolean individual, boolean block, final ClientConsumer consumer, final Message message) throws ActiveMQException
+   {
+      PacketImpl messagePacket;
+      if (individual)
+      {
+         messagePacket = new SessionIndividualAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);
+      }
+      else
+      {
+         messagePacket = new SessionAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);
+      }
+
+      if (block)
+      {
+         sessionChannel.sendBlocking(messagePacket, PacketImpl.NULL_RESPONSE);
+      }
+      else
+      {
+         sessionChannel.sendBatched(messagePacket);
+      }
+   }
+
+   public void expireMessage(final ClientConsumer consumer, Message message) throws ActiveMQException
+   {
+      SessionExpireMessage messagePacket = new SessionExpireMessage(getConsumerID(consumer), message.getMessageID());
+
+      sessionChannel.send(messagePacket);
+   }
+
+
+   public void sessionClose() throws ActiveMQException
+   {
+      sessionChannel.sendBlocking(new SessionCloseMessage(), PacketImpl.NULL_RESPONSE);
+   }
+
+   public void xaForget(Xid xid) throws XAException, ActiveMQException
+   {
+      SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(new SessionXAForgetMessage(xid), PacketImpl.SESS_XA_RESP);
+
+      if (response.isError())
+      {
+         throw new XAException(response.getResponseCode());
+      }
+   }
+
+   public int xaPrepare(Xid xid) throws XAException, ActiveMQException
+   {
+      SessionXAPrepareMessage packet = new SessionXAPrepareMessage(xid);
+
+      SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
+
+      if (response.isError())
+      {
+         throw new XAException(response.getResponseCode());
+      }
+      else
+      {
+         return response.getResponseCode();
+      }
+   }
+
+   public Xid[] xaScan() throws ActiveMQException
+   {
+      SessionXAGetInDoubtXidsResponseMessage response = (SessionXAGetInDoubtXidsResponseMessage) sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS), PacketImpl.SESS_XA_INDOUBT_XIDS_RESP);
+
+      List<Xid> xids = response.getXids();
+
+      Xid[] xidArray = xids.toArray(new Xid[xids.size()]);
+
+      return xidArray;
+   }
+
+   public void xaRollback(Xid xid, boolean wasStarted) throws ActiveMQException, XAException
+   {
+      SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
+
+      SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
+
+      if (response.isError())
+      {
+         throw new XAException(response.getResponseCode());
+      }
+   }
+
+   public void xaStart(Xid xid, int flags) throws XAException, ActiveMQException
+   {
+      Packet packet;
+      if (flags == XAResource.TMJOIN)
+      {
+         packet = new SessionXAJoinMessage(xid);
+      }
+      else if (flags == XAResource.TMRESUME)
+      {
+         packet = new SessionXAResumeMessage(xid);
+      }
+      else if (flags == XAResource.TMNOFLAGS)
+      {
+         // Don't need to flush since the previous end will have done this
+         packet = new SessionXAStartMessage(xid);
+      }
+      else
+      {
+         throw new XAException(XAException.XAER_INVAL);
+      }
+
+      SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP);
+
+      if (response.isError())
+      {
+         ActiveMQClientLogger.LOGGER.errorCallingStart(response.getMessage(), response.getResponseCode());
+         throw new XAException(response.getResponseCode());
+      }
+   }
+
+   public boolean configureTransactionTimeout(int seconds) throws ActiveMQException
+   {
+      SessionXASetTimeoutResponseMessage response = (SessionXASetTimeoutResponseMessage) sessionChannel.sendBlocking(new SessionXASetTimeoutMessage(seconds), PacketImpl.SESS_XA_SET_TIMEOUT_RESP);
+
+      return response.isOK();
+   }
+
+   public int recoverSessionTimeout() throws ActiveMQException
+   {
+      SessionXAGetTimeoutResponseMessage response = (SessionXAGetTimeoutResponseMessage) sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT), PacketImpl.SESS_XA_GET_TIMEOUT_RESP);
+
+      return response.getTimeoutSeconds();
+   }
+
+   public void createQueue(SimpleString address, SimpleString queueName, SimpleString filterString, boolean durable, boolean temp) throws ActiveMQException
+   {
+      CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp, true);
+      sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
+   }
+
+   @Override
+   public boolean reattachOnNewConnection(RemotingConnection newConnection) throws ActiveMQException
+   {
+
+      this.remotingConnection = newConnection;
+
+      sessionChannel.transferConnection((CoreRemotingConnection) newConnection);
+
+      Packet request = new ReattachSessionMessage(name, sessionChannel.getLastConfirmedCommandID());
+
+      Channel channel1 = getCoreConnection().getChannel(1, -1);
+
+      ReattachSessionResponseMessage response = (ReattachSessionResponseMessage) channel1.sendBlocking(request, PacketImpl.REATTACH_SESSION_RESP);
+
+      if (response.isReattached())
+      {
+         if (ActiveMQClientLogger.LOGGER.isDebugEnabled())
+         {
+            ActiveMQClientLogger.LOGGER.debug("ClientSession reattached fine, replaying commands");
+         }
+         // The session was found on the server - we reattached transparently ok
+
+         sessionChannel.replayCommands(response.getLastConfirmedCommandID());
+
+         return true;
+      }
+      else
+      {
+
+         sessionChannel.clearCommands();
+
+         return false;
+      }
+
+   }
+
+   public void recreateSession(final String username,
+                               final String password,
+                               final int minLargeMessageSize,
+                               final boolean xa,
+                               final boolean autoCommitSends,
+                               final boolean autoCommitAcks,
+                               final boolean preAcknowledge,
+                               final SimpleString defaultAddress) throws ActiveMQException
+   {
+      Packet createRequest = new CreateSessionMessage(name,
+                                                      sessionChannel.getID(),
+                                                      getServerVersion(),
+                                                      username,
+                                                      password,
+                                                      minLargeMessageSize,
+                                                      xa,
+                                                      autoCommitSends,
+                                                      autoCommitAcks,
+                                                      preAcknowledge,
+                                                      confirmationWindow,
+                                                      defaultAddress == null ? null
+                                                         : defaultAddress.toString());
+      boolean retry;
+      do
+      {
+         try
+         {
+            getCreateChannel().sendBlocking(createRequest, PacketImpl.CREATESESSION_RESP);
+            retry = false;
+         }
+         catch (ActiveMQException e)
+         {
+            // the session was created while its server was starting, retry it:
+            if (e.getType() == ActiveMQExceptionType.SESSION_CREATION_REJECTED)
+            {
+               ActiveMQClientLogger.LOGGER.retryCreateSessionSeverStarting(name);
+               retry = true;
+               // sleep a little bit to avoid spinning too much
+               try
+               {
+                  Thread.sleep(10);
+               }
+               catch (InterruptedException ie)
+               {
+                  Thread.currentThread().interrupt();
+                  throw e;
+               }
+            }
+            else
+            {
+               throw e;
+            }
+         }
+      }
+      while (retry && !session.isClosing());
+   }
+
+   @Override
+   public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException
+   {
+      ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo();
+
+      // We try and recreate any non durable queues, since they probably won't be there unless
+      // they are defined in hornetq-configuration.xml
+      // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
+      if (!queueInfo.isDurable())
+      {
+         CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(),
+                                                                        queueInfo.getName(),
+                                                                        queueInfo.getFilterString(),
+                                                                        false,
+                                                                        queueInfo.isTemporary(),
+                                                                        false);
+
+         sendPacketWithoutLock(sessionChannel, createQueueRequest);
+      }
+
+      SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal),
+                                                                                            consumerInternal.getQueueName(),
+                                                                                            consumerInternal.getFilterString(),
+                                                                                            consumerInternal.isBrowseOnly(),
+                                                                                            false);
+
+      sendPacketWithoutLock(sessionChannel, createConsumerRequest);
+
+      int clientWindowSize = consumerInternal.getClientWindowSize();
+
+      if (clientWindowSize != 0)
+      {
+         SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal),
+                                                                                        clientWindowSize);
+
+         sendPacketWithoutLock(sessionChannel, packet);
+      }
+      else
+      {
+         // https://jira.jboss.org/browse/HORNETQ-522
+         SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal),
+                                                                                        1);
+         sendPacketWithoutLock(sessionChannel, packet);
+      }
+   }
+
+   public void xaFailed(Xid xid) throws ActiveMQException
+   {
+      sendPacketWithoutLock(sessionChannel, new SessionXAAfterFailedMessage(xid));
+   }
+
+   public void restartSession() throws ActiveMQException
+   {
+      sendPacketWithoutLock(sessionChannel, new PacketImpl(PacketImpl.SESS_START));
+   }
+
+   @Override
+   public void resetMetadata(HashMap<String, String> metaDataToSend)
+   {
+      // Resetting the metadata after failover
+      for (Map.Entry<String, String> entries : metaDataToSend.entrySet())
+      {
+         sendPacketWithoutLock(sessionChannel, new SessionAddMetaDataMessageV2(entries.getKey(), entries.getValue(), false));
+      }
+   }
+
+
+   private Channel getCreateChannel()
+   {
+      return getCoreConnection().getChannel(1, -1);
+   }
+
+   private CoreRemotingConnection getCoreConnection()
+   {
+      return (CoreRemotingConnection) remotingConnection;
+   }
+
+
+   /**
+    * This doesn't apply to other protocols probably, so it will be an ActiveMQ exclusive feature
+    *
+    * @throws org.apache.activemq.api.core.ActiveMQException
+    */
+   private void handleConsumerDisconnected(DisconnectConsumerMessage packet) throws ActiveMQException
+   {
+      DisconnectConsumerMessage message = packet;
+
+      session.handleConsumerDisconnect(new ActiveMQConsumerContext(message.getConsumerId()));
+   }
+
+   private void handleReceivedMessagePacket(SessionReceiveMessage messagePacket) throws Exception
+   {
+      ClientMessageInternal msgi = (ClientMessageInternal) messagePacket.getMessage();
+
+      msgi.setDeliveryCount(messagePacket.getDeliveryCount());
+
+      msgi.setFlowControlSize(messagePacket.getPacketSize());
+
+      handleReceiveMessage(new ActiveMQConsumerContext(messagePacket.getConsumerID()), msgi);
+   }
+
+   private void handleReceiveLargeMessage(SessionReceiveLargeMessage serverPacket) throws Exception
+   {
+      ClientLargeMessageInternal clientLargeMessage = (ClientLargeMessageInternal) serverPacket.getLargeMessage();
+
+      clientLargeMessage.setFlowControlSize(serverPacket.getPacketSize());
+
+      clientLargeMessage.setDeliveryCount(serverPacket.getDeliveryCount());
+
+      handleReceiveLargeMessage(new ActiveMQConsumerContext(serverPacket.getConsumerID()), clientLargeMessage, serverPacket.getLargeMessageSize());
+   }
+
+
+   private void handleReceiveContinuation(SessionReceiveContinuationMessage continuationPacket) throws Exception
+   {
+      handleReceiveContinuation(new ActiveMQConsumerContext(continuationPacket.getConsumerID()), continuationPacket.getBody(), continuationPacket.getPacketSize(),
+                                continuationPacket.isContinues());
+   }
+
+
+   protected void handleReceiveProducerCredits(SessionProducerCreditsMessage message)
+   {
+      handleReceiveProducerCredits(message.getAddress(), message.getCredits());
+   }
+
+
+   protected void handleReceiveProducerFailCredits(SessionProducerCreditsFailMessage message)
+   {
+      handleReceiveProducerFailCredits(message.getAddress(), message.getCredits());
+   }
+
+   class ClientSessionPacketHandler implements ChannelHandler
+   {
+
+      public void handlePacket(final Packet packet)
+      {
+         byte type = packet.getType();
+
+         try
+         {
+            switch (type)
+            {
+               case DISCONNECT_CONSUMER:
+               {
+                  handleConsumerDisconnected((DisconnectConsumerMessage) packet);
+                  break;
+               }
+               case SESS_RECEIVE_CONTINUATION:
+               {
+                  handleReceiveContinuation((SessionReceiveContinuationMessage) packet);
+
+                  break;
+               }
+               case SESS_RECEIVE_MSG:
+               {
+                  handleReceivedMessagePacket((SessionReceiveMessage) packet);
+
+                  break;
+               }
+               case SESS_RECEIVE_LARGE_MSG:
+               {
+                  handleReceiveLargeMessage((SessionReceiveLargeMessage) packet);
+
+                  break;
+               }
+               case PacketImpl.SESS_PRODUCER_CREDITS:
+               {
+                  handleReceiveProducerCredits((SessionProducerCreditsMessage) packet);
+
+                  break;
+               }
+               case PacketImpl.SESS_PRODUCER_FAIL_CREDITS:
+               {
+                  handleReceiveProducerFailCredits((SessionProducerCreditsFailMessage) packet);
+
+                  break;
+               }
+               case EXCEPTION:
+               {
+                  // We can only log these exceptions
+                  // maybe we should cache it on SessionContext and throw an exception on any next calls
+                  ActiveMQExceptionMessage mem = (ActiveMQExceptionMessage) packet;
+
+                  ActiveMQClientLogger.LOGGER.receivedExceptionAsynchronously(mem.getException());
+
+                  break;
+               }
+               default:
+               {
+                  throw new IllegalStateException("Invalid packet: " + type);
+               }
+            }
+         }
+         catch (Exception e)
+         {
+            ActiveMQClientLogger.LOGGER.failedToHandlePacket(e);
+         }
+
+         sessionChannel.confirm(packet);
+      }
+   }
+
+   private long getConsumerID(ClientConsumer consumer)
+   {
+      return ((ActiveMQConsumerContext)consumer.getConsumerContext()).getId();
+   }
+
+   private ClassLoader lookupTCCL()
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+      {
+         public ClassLoader run()
+         {
+            return Thread.currentThread().getContextClassLoader();
+         }
+      });
+
+   }
+
+   private int calcWindowSize(final int windowSize)
+   {
+      int clientWindowSize;
+      if (windowSize == -1)
+      {
+         // No flow control - buffer can increase without bound! Only use with
+         // caution for very fast consumers
+         clientWindowSize = -1;
+      }
+      else if (windowSize == 0)
+      {
+         // Slow consumer - no buffering
+         clientWindowSize = 0;
+      }
+      else if (windowSize == 1)
+      {
+         // Slow consumer = buffer 1
+         clientWindowSize = 1;
+      }
+      else if (windowSize > 1)
+      {
+         // Client window size is half server window size
+         clientWindowSize = windowSize >> 1;
+      }
+      else
+      {
+         throw ActiveMQClientMessageBundle.BUNDLE.invalidWindowSize(windowSize);
+      }
+
+      return clientWindowSize;
+   }
+
+
+   private void sendPacketWithoutLock(final Channel parameterChannel, final Packet packet)
+   {
+      packet.setChannelID(parameterChannel.getID());
+
+      Connection conn = parameterChannel.getConnection().getTransportConnection();
+
+      ActiveMQBuffer buffer = packet.encode(this.getCoreConnection());
+
+      conn.write(buffer, false, false);
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java
index c5709b4..de7a3ef 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java
@@ -25,14 +25,14 @@ import org.apache.activemq.api.core.ActiveMQBuffer;
 import org.apache.activemq.api.core.ActiveMQException;
 import org.apache.activemq.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.api.core.Interceptor;
-import org.apache.activemq.core.client.HornetQClientLogger;
-import org.apache.activemq.core.client.HornetQClientMessageBundle;
+import org.apache.activemq.core.client.ActiveMQClientLogger;
+import org.apache.activemq.core.client.ActiveMQClientMessageBundle;
 import org.apache.activemq.core.protocol.core.Channel;
 import org.apache.activemq.core.protocol.core.ChannelHandler;
 import org.apache.activemq.core.protocol.core.CommandConfirmationHandler;
 import org.apache.activemq.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.core.protocol.core.Packet;
-import org.apache.activemq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.apache.activemq.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
 import org.apache.activemq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
 import org.apache.activemq.spi.core.protocol.RemotingConnection;
 
@@ -83,7 +83,7 @@ public final class ChannelImpl implements Channel
       }
    }
 
-   private static final boolean isTrace = HornetQClientLogger.LOGGER.isTraceEnabled();
+   private static final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled();
 
    private volatile long id;
 
@@ -193,7 +193,7 @@ public final class ChannelImpl implements Channel
 
       try
       {
-         response = new HornetQExceptionMessage(HornetQClientMessageBundle.BUNDLE.unblockingACall(cause));
+         response = new ActiveMQExceptionMessage(ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause));
 
          sendCondition.signal();
       }
@@ -237,7 +237,7 @@ public final class ChannelImpl implements Channel
 
          if (isTrace)
          {
-            HornetQClientLogger.LOGGER.trace("Sending packet nonblocking " + packet + " on channeID=" + id);
+            ActiveMQClientLogger.LOGGER.trace("Sending packet nonblocking " + packet + " on channeID=" + id);
          }
 
          ActiveMQBuffer buffer = packet.encode(connection);
@@ -277,7 +277,7 @@ public final class ChannelImpl implements Channel
 
          if (isTrace)
          {
-            HornetQClientLogger.LOGGER.trace("Writing buffer for channelID=" + id);
+            ActiveMQClientLogger.LOGGER.trace("Writing buffer for channelID=" + id);
          }
 
 
@@ -301,12 +301,12 @@ public final class ChannelImpl implements Channel
       if (interceptionResult != null)
       {
          // if we don't throw an exception here the client might not unblock
-         throw HornetQClientMessageBundle.BUNDLE.interceptorRejectedPacket(interceptionResult);
+         throw ActiveMQClientMessageBundle.BUNDLE.interceptorRejectedPacket(interceptionResult);
       }
 
       if (closed)
       {
-         throw HornetQClientMessageBundle.BUNDLE.connectionDestroyed();
+         throw ActiveMQClientMessageBundle.BUNDLE.connectionDestroyed();
       }
 
       if (connection.getBlockingCallTimeout() == -1)
@@ -341,7 +341,7 @@ public final class ChannelImpl implements Channel
                   {
                      if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(), TimeUnit.MILLISECONDS))
                      {
-                        HornetQClientLogger.LOGGER.debug("timed-out waiting for failover condition");
+                        ActiveMQClientLogger.LOGGER.debug("timed-out waiting for failover condition");
                      }
                   }
                }
@@ -378,7 +378,7 @@ public final class ChannelImpl implements Channel
 
                if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket)
                {
-                  HornetQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace"));
+                  ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace"));
                }
 
                if (closed)
@@ -395,12 +395,12 @@ public final class ChannelImpl implements Channel
 
             if (response == null)
             {
-               throw HornetQClientMessageBundle.BUNDLE.timedOutSendingPacket(packet.getType());
+               throw ActiveMQClientMessageBundle.BUNDLE.timedOutSendingPacket(packet.getType());
             }
 
             if (response.getType() == PacketImpl.EXCEPTION)
             {
-               final HornetQExceptionMessage mem = (HornetQExceptionMessage) response;
+               final ActiveMQExceptionMessage mem = (ActiveMQExceptionMessage) response;
 
                ActiveMQException e = mem.getException();
 
@@ -433,13 +433,13 @@ public final class ChannelImpl implements Channel
             {
                boolean callNext = interceptor.intercept(packet, connection);
 
-               if (HornetQClientLogger.LOGGER.isDebugEnabled())
+               if (ActiveMQClientLogger.LOGGER.isDebugEnabled())
                {
                   // use a StringBuilder for speed since this may be executed a lot
                   StringBuilder msg = new StringBuilder();
                   msg.append("Invocation of interceptor ").append(interceptor.getClass().getName()).append(" on ").
                      append(packet).append(" returned ").append(callNext);
-                  HornetQClientLogger.LOGGER.debug(msg.toString());
+                  ActiveMQClientLogger.LOGGER.debug(msg.toString());
                }
 
                if (!callNext)
@@ -449,7 +449,7 @@ public final class ChannelImpl implements Channel
             }
             catch (final Throwable e)
             {
-               HornetQClientLogger.LOGGER.errorCallingInterceptor(e, interceptor);
+               ActiveMQClientLogger.LOGGER.errorCallingInterceptor(e, interceptor);
             }
          }
       }
@@ -488,7 +488,7 @@ public final class ChannelImpl implements Channel
 
       if (!connection.isDestroyed() && !connection.removeChannel(id))
       {
-         throw HornetQClientMessageBundle.BUNDLE.noChannelToClose(id);
+         throw ActiveMQClientMessageBundle.BUNDLE.noChannelToClose(id);
       }
 
       if (failingOver)
@@ -524,7 +524,7 @@ public final class ChannelImpl implements Channel
       {
          if (isTrace)
          {
-            HornetQClientLogger.LOGGER.trace("Replaying commands on channelID=" + id);
+            ActiveMQClientLogger.LOGGER.trace("Replaying commands on channelID=" + id);
          }
          clearUpTo(otherLastConfirmedCommandID);
 
@@ -664,7 +664,7 @@ public final class ChannelImpl implements Channel
 
       if (numberToClear == -1)
       {
-         throw HornetQClientMessageBundle.BUNDLE.invalidCommandID(lastReceivedCommandID);
+         throw ActiveMQClientMessageBundle.BUNDLE.invalidCommandID(lastReceivedCommandID);
       }
 
       int sizeToFree = 0;
@@ -677,7 +677,7 @@ public final class ChannelImpl implements Channel
          {
             if (lastReceivedCommandID > 0)
             {
-               HornetQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID);
+               ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID);
             }
             firstStoredCommandID = lastReceivedCommandID + 1;
             return;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManager.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManager.java
deleted file mode 100644
index c98db97..0000000
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManager.java
+++ /dev/null
@@ -1,613 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.apache.activemq.core.protocol.core.impl;
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
-import io.netty.channel.ChannelPipeline;
-import org.apache.activemq.api.core.ActiveMQBuffer;
-import org.apache.activemq.api.core.ActiveMQException;
-import org.apache.activemq.api.core.ActiveMQExceptionType;
-import org.apache.activemq.api.core.ActiveMQInterruptedException;
-import org.apache.activemq.api.core.Interceptor;
-import org.apache.activemq.api.core.Pair;
-import org.apache.activemq.api.core.SimpleString;
-import org.apache.activemq.api.core.TransportConfiguration;
-import org.apache.activemq.api.core.client.ClientSessionFactory;
-import org.apache.activemq.api.core.client.HornetQClient;
-import org.apache.activemq.core.client.HornetQClientLogger;
-import org.apache.activemq.core.client.HornetQClientMessageBundle;
-import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal;
-import org.apache.activemq.core.protocol.ClientPacketDecoder;
-import org.apache.activemq.core.protocol.core.Channel;
-import org.apache.activemq.core.protocol.core.ChannelHandler;
-import org.apache.activemq.core.protocol.core.CoreRemotingConnection;
-import org.apache.activemq.core.protocol.core.Packet;
-import org.apache.activemq.core.protocol.core.impl.wireformat.CheckFailoverMessage;
-import org.apache.activemq.core.protocol.core.impl.wireformat.CheckFailoverReplyMessage;
-import org.apache.activemq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
-import org.apache.activemq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
-import org.apache.activemq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V3;
-import org.apache.activemq.core.protocol.core.impl.wireformat.CreateSessionMessage;
-import org.apache.activemq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
-import org.apache.activemq.core.protocol.core.impl.wireformat.DisconnectMessage;
-import org.apache.activemq.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
-import org.apache.activemq.core.protocol.core.impl.wireformat.Ping;
-import org.apache.activemq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
-import org.apache.activemq.core.remoting.impl.netty.HornetQFrameDecoder2;
-import org.apache.activemq.core.version.Version;
-import org.apache.activemq.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.spi.core.remoting.ClientProtocolManager;
-import org.apache.activemq.spi.core.remoting.Connection;
-import org.apache.activemq.spi.core.remoting.TopologyResponseHandler;
-import org.apache.activemq.spi.core.remoting.SessionContext;
-import org.apache.activemq.utils.VersionLoader;
-
-/**
- * This class will return specific packets for different types of actions happening on a messaging protocol.
- * <p/>
- * This is trying to unify the Core client into multiple protocols.
- * <p/>
- * Returning null in certain packets means no action is taken on this specific protocol.
- * <p/>
- * Semantic properties could also be added to this implementation.
- * <p/>
- * Implementations of this class need to be stateless.
- *
- * @author Clebert Suconic
- */
-
-public class HornetQClientProtocolManager implements ClientProtocolManager
-{
-   private final int versionID = VersionLoader.getVersion().getIncrementingVersion();
-
-   private ClientSessionFactoryInternal factoryInternal;
-
-   /**
-    * Guards assignments to {@link #inCreateSession} and {@link #inCreateSessionLatch}
-    */
-   private final Object inCreateSessionGuard = new Object();
-
-   /**
-    * Flag that tells whether we are trying to create a session.
-    */
-   private boolean inCreateSession;
-
-   /**
-    * Used to wait for the creation of a session.
-    */
-   private CountDownLatch inCreateSessionLatch;
-
-   protected volatile RemotingConnectionImpl connection;
-
-   protected TopologyResponseHandler topologyResponseHandler;
-
-   /**
-    * Flag that signals that the communication is closing. Causes many processes to exit.
-    */
-   private volatile boolean alive = true;
-
-   private final CountDownLatch waitLatch = new CountDownLatch(1);
-
-
-   public HornetQClientProtocolManager()
-   {
-   }
-
-   public String getName()
-   {
-      return HornetQClient.DEFAULT_CORE_PROTOCOL;
-   }
-
-   public void setSessionFactory(ClientSessionFactory factory)
-   {
-      this.factoryInternal = (ClientSessionFactoryInternal)factory;
-   }
-
-   public ClientSessionFactory getSessionFactory()
-   {
-      return this.factoryInternal;
-   }
-
-   @Override
-   public void addChannelHandlers(ChannelPipeline pipeline)
-   {
-      pipeline.addLast("hornetq-decoder", new HornetQFrameDecoder2());
-   }
-
-   public boolean waitOnLatch(long milliseconds) throws InterruptedException
-   {
-      return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS);
-   }
-
-   public Channel getChannel0()
-   {
-      if (connection == null)
-      {
-         return null;
-      }
-      else
-      {
-         return connection.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1);
-      }
-   }
-
-   public RemotingConnection getCurrentConnection()
-   {
-      return connection;
-   }
-
-
-   public Channel getChannel1()
-   {
-      if (connection == null)
-      {
-         return null;
-      }
-      else
-      {
-         return connection.getChannel(1, -1);
-      }
-   }
-
-   public Lock lockSessionCreation()
-   {
-      try
-      {
-         Lock localFailoverLock = factoryInternal.lockFailover();
-         try
-         {
-            if (connection == null)
-            {
-               return null;
-            }
-
-            Lock lock = getChannel1().getLock();
-
-            // Lock it - this must be done while the failoverLock is held
-            while (isAlive() && !lock.tryLock(100, TimeUnit.MILLISECONDS))
-            {
-            }
-
-            return lock;
-         }
-         finally
-         {
-            localFailoverLock.unlock();
-         }
-         // We can now release the failoverLock
-      }
-      catch (InterruptedException e)
-      {
-         Thread.currentThread().interrupt();
-         return null;
-      }
-   }
-
-
-   public void stop()
-   {
-      alive = false;
-
-
-      synchronized (inCreateSessionGuard)
-      {
-         if (inCreateSessionLatch != null)
-            inCreateSessionLatch.countDown();
-      }
-
-
-      Channel channel1 = getChannel1();
-      if (channel1 != null)
-      {
-         channel1.returnBlocking();
-      }
-
-      waitLatch.countDown();
-
-   }
-
-   public boolean isAlive()
-   {
-      return alive;
-   }
-
-
-   @Override
-   public void ping(long connectionTTL)
-   {
-      Channel channel = connection.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1);
-
-      Ping ping = new Ping(connectionTTL);
-
-      channel.send(ping);
-
-      connection.flush();
-   }
-
-   @Override
-   public void sendSubscribeTopology(final boolean isServer)
-   {
-      getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer,
-                                                                      VersionLoader.getVersion()
-                                                                         .getIncrementingVersion()));
-   }
-
-   @Override
-   public SessionContext createSessionContext(String name, String username, String password,
-                                              boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
-                                              boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws ActiveMQException
-   {
-      for (Version clientVersion : VersionLoader.getClientVersions())
-      {
-         try
-         {
-            return createSessionContext(clientVersion,
-                                        name,
-                                        username,
-                                        password,
-                                        xa,
-                                        autoCommitSends,
-                                        autoCommitAcks,
-                                        preAcknowledge,
-                                        minLargeMessageSize,
-                                        confirmationWindowSize);
-         }
-         catch (ActiveMQException e)
-         {
-            if (e.getType() != ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS)
-            {
-               throw e;
-            }
-         }
-      }
-      connection.destroy();
-      throw new ActiveMQException(ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS);
-   }
-
-   public SessionContext createSessionContext(Version clientVersion, String name, String username, String password,
-                                              boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
-                                              boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws ActiveMQException
-   {
-      if (!isAlive())
-         throw HornetQClientMessageBundle.BUNDLE.clientSessionClosed();
-
-      Channel sessionChannel = null;
-      CreateSessionResponseMessage response = null;
-
-      boolean retry;
-      do
-      {
-         retry = false;
-
-         Lock lock = null;
-
-         try
-         {
-
-            lock = lockSessionCreation();
-
-            // We now set a flag saying createSession is executing
-            synchronized (inCreateSessionGuard)
-            {
-               if (!isAlive())
-                  throw HornetQClientMessageBundle.BUNDLE.clientSessionClosed();
-               inCreateSession = true;
-               inCreateSessionLatch = new CountDownLatch(1);
-            }
-
-            long sessionChannelID = connection.generateChannelID();
-
-            Packet request = new CreateSessionMessage(name,
-                                                      sessionChannelID,
-                                                      clientVersion.getIncrementingVersion(),
-                                                      username,
-                                                      password,
-                                                      minLargeMessageSize,
-                                                      xa,
-                                                      autoCommitSends,
-                                                      autoCommitAcks,
-                                                      preAcknowledge,
-                                                      confirmationWindowSize,
-                                                      null);
-
-
-            try
-            {
-               // channel1 reference here has to go away
-               response = (CreateSessionResponseMessage) getChannel1().sendBlocking(request, PacketImpl.CREATESESSION_RESP);
-            }
-            catch (ActiveMQException cause)
-            {
-               if (!isAlive())
-                  throw cause;
-
-               if (cause.getType() == ActiveMQExceptionType.UNBLOCKED)
-               {
-                  // This means the thread was blocked on create session and failover unblocked it
-                  // so failover could occur
-
-                  retry = true;
-
-                  continue;
-               }
-               else
-               {
-                  throw cause;
-               }
-            }
-
-            sessionChannel = connection.getChannel(sessionChannelID, confirmationWindowSize);
-
-
-         }
-         catch (Throwable t)
-         {
-            if (lock != null)
-            {
-               lock.unlock();
-               lock = null;
-            }
-
-            if (t instanceof ActiveMQException)
-            {
-               throw (ActiveMQException) t;
-            }
-            else
-            {
-               throw HornetQClientMessageBundle.BUNDLE.failedToCreateSession(t);
-            }
-         }
-         finally
-         {
-            if (lock != null)
-            {
-               lock.unlock();
-            }
-
-            // Execution has finished so notify any failover thread that may be waiting for us to be done
-            inCreateSession = false;
-            inCreateSessionLatch.countDown();
-         }
-      }
-      while (retry);
-
-
-      // these objects won't be null, otherwise it would keep retrying on the previous loop
-      return new HornetQSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize);
-
-   }
-
-   public boolean cleanupBeforeFailover(ActiveMQException cause)
-   {
-
-      boolean needToInterrupt;
-
-      CountDownLatch exitLockLatch;
-      Lock lock = lockSessionCreation();
-
-      if (lock == null)
-      {
-         return false;
-      }
-
-      try
-      {
-         synchronized (inCreateSessionGuard)
-         {
-            needToInterrupt = inCreateSession;
-            exitLockLatch = inCreateSessionLatch;
-         }
-      }
-      finally
-      {
-         lock.unlock();
-      }
-
-      if (needToInterrupt)
-      {
-         forceReturnChannel1(cause);
-
-         // Now we need to make sure that the thread has actually exited and returned it's
-         // connections
-         // before failover occurs
-
-         while (inCreateSession && isAlive())
-         {
-            try
-            {
-               if (exitLockLatch != null)
-               {
-                  exitLockLatch.await(500, TimeUnit.MILLISECONDS);
-               }
-            }
-            catch (InterruptedException e1)
-            {
-               throw new ActiveMQInterruptedException(e1);
-            }
-         }
-      }
-
-      return true;
-   }
-
-   @Override
-   public boolean checkForFailover(String liveNodeID) throws ActiveMQException
-   {
-      CheckFailoverMessage packet = new CheckFailoverMessage(liveNodeID);
-      CheckFailoverReplyMessage message = (CheckFailoverReplyMessage) getChannel1().sendBlocking(packet,
-                                                                                                 PacketImpl.CHECK_FOR_FAILOVER_REPLY);
-      return message.isOkToFailover();
-   }
-
-
-   public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout,
-                                     List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors,
-                                     TopologyResponseHandler topologyResponseHandler)
-   {
-      this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection,
-                                                                             callTimeout, callFailoverTimeout,
-                                                                             incomingInterceptors, outgoingInterceptors);
-
-      this.topologyResponseHandler = topologyResponseHandler;
-
-      getChannel0().setHandler(new Channel0Handler(connection));
-
-
-      sendHandshake(transportConnection);
-
-      return connection;
-   }
-
-   private void sendHandshake(Connection transportConnection)
-   {
-      if (transportConnection.isUsingProtocolHandling())
-      {
-         // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling
-         String handshake = "HORNETQ";
-         ActiveMQBuffer hqbuffer = connection.createBuffer(handshake.length());
-         hqbuffer.writeBytes(handshake.getBytes());
-         transportConnection.write(hqbuffer);
-      }
-   }
-
-
-   private class Channel0Handler implements ChannelHandler
-   {
-      private final CoreRemotingConnection conn;
-
-      private Channel0Handler(final CoreRemotingConnection conn)
-      {
-         this.conn = conn;
-      }
-
-      public void handlePacket(final Packet packet)
-      {
-         final byte type = packet.getType();
-
-         if (type == PacketImpl.DISCONNECT || type == PacketImpl.DISCONNECT_V2)
-         {
-            final DisconnectMessage msg = (DisconnectMessage) packet;
-            String scaleDownTargetNodeID = null;
-
-            SimpleString nodeID = msg.getNodeID();
-
-            if (packet instanceof DisconnectMessage_V2)
-            {
-               final DisconnectMessage_V2 msg_v2 = (DisconnectMessage_V2) packet;
-               scaleDownTargetNodeID = msg_v2.getScaleDownNodeID() == null ? null : msg_v2.getScaleDownNodeID().toString();
-            }
-
-            if (topologyResponseHandler != null)
-               topologyResponseHandler.nodeDisconnected(conn, nodeID == null ? null : nodeID.toString(), scaleDownTargetNodeID);
-         }
-         else if (type == PacketImpl.CLUSTER_TOPOLOGY)
-         {
-            ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet;
-            notifyTopologyChange(topMessage);
-         }
-         else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2)
-         {
-            ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2) packet;
-            notifyTopologyChange(topMessage);
-         }
-         else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3)
-         {
-            ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet;
-            notifyTopologyChange(topMessage);
-         }
-         else if (type == PacketImpl.CHECK_FOR_FAILOVER_REPLY)
-         {
-            System.out.println("Channel0Handler.handlePacket");
-         }
-      }
-
-      /**
-       * @param topMessage
-       */
-      private void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage)
-      {
-         final long eventUID;
-         final String backupGroupName;
-         final String scaleDownGroupName;
-         if (topMessage instanceof ClusterTopologyChangeMessage_V3)
-         {
-            eventUID = ((ClusterTopologyChangeMessage_V3) topMessage).getUniqueEventID();
-            backupGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getBackupGroupName();
-            scaleDownGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getScaleDownGroupName();
-         }
-         else if (topMessage instanceof ClusterTopologyChangeMessage_V2)
-         {
-            eventUID = ((ClusterTopologyChangeMessage_V2) topMessage).getUniqueEventID();
-            backupGroupName = ((ClusterTopologyChangeMessage_V2) topMessage).getBackupGroupName();
-            scaleDownGroupName = null;
-         }
-         else
-         {
-            eventUID = System.currentTimeMillis();
-            backupGroupName = null;
-            scaleDownGroupName = null;
-         }
-
-         if (topMessage.isExit())
-         {
-            if (HornetQClientLogger.LOGGER.isDebugEnabled())
-            {
-               HornetQClientLogger.LOGGER.debug("Notifying " + topMessage.getNodeID() + " going down");
-            }
-
-            if (topologyResponseHandler != null)
-            {
-               topologyResponseHandler.notifyNodeDown(eventUID, topMessage.getNodeID());
-            }
-         }
-         else
-         {
-            Pair<TransportConfiguration, TransportConfiguration> transportConfig = topMessage.getPair();
-            if (transportConfig.getA() == null && transportConfig.getB() == null)
-            {
-               transportConfig = new Pair<>(conn.getTransportConnection()
-                                               .getConnectorConfig(),
-                                            null);
-            }
-
-            if (topologyResponseHandler != null)
-            {
-               topologyResponseHandler.notifyNodeUp(eventUID, topMessage.getNodeID(), backupGroupName, scaleDownGroupName, transportConfig, topMessage.isLast());
-            }
-         }
-      }
-   }
-
-   protected PacketDecoder getPacketDecoder()
-   {
-      return ClientPacketDecoder.INSTANCE;
-   }
-
-   private void forceReturnChannel1(ActiveMQException cause)
-   {
-      if (connection != null)
-      {
-         Channel channel1 = connection.getChannel(1, -1);
-
-         if (channel1 != null)
-         {
-            channel1.returnBlocking(cause);
-         }
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java
deleted file mode 100644
index 103e20d..0000000
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.apache.activemq.core.protocol.core.impl;
-
-import org.apache.activemq.spi.core.remoting.ClientProtocolManager;
-import org.apache.activemq.spi.core.remoting.ClientProtocolManagerFactory;
-
-/**
- * @author Clebert Suconic
- */
-
-public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory
-{
-   private static final long serialVersionUID = 1;
-
-   private static final HornetQClientProtocolManagerFactory INSTANCE = new HornetQClientProtocolManagerFactory();
-
-   private HornetQClientProtocolManagerFactory()
-   {
-   }
-
-   public static final HornetQClientProtocolManagerFactory getInstance()
-   {
-      return INSTANCE;
-   }
-
-   public ClientProtocolManager newProtocolManager()
-   {
-      return new HornetQClientProtocolManager();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQConsumerContext.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQConsumerContext.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQConsumerContext.java
deleted file mode 100644
index 27585e8..0000000
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQConsumerContext.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.apache.activemq.core.protocol.core.impl;
-
-import org.apache.activemq.spi.core.remoting.ConsumerContext;
-
-/**
- * @author Clebert Suconic
- */
-
-public class HornetQConsumerContext extends ConsumerContext
-{
-   private long id;
-
-   public HornetQConsumerContext(long id)
-   {
-      this.id = id;
-   }
-
-   public long getId()
-   {
-      return id;
-   }
-
-   @Override
-   public boolean equals(Object o)
-   {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      HornetQConsumerContext that = (HornetQConsumerContext) o;
-
-      if (id != that.id) return false;
-
-      return true;
-   }
-
-   @Override
-   public int hashCode()
-   {
-      return (int) (id ^ (id >>> 32));
-   }
-}


Mime
View raw message