Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C53C9100B6 for ; Tue, 18 Nov 2014 23:37:59 +0000 (UTC) Received: (qmail 99127 invoked by uid 500); 18 Nov 2014 23:37:58 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 99027 invoked by uid 500); 18 Nov 2014 23:37:58 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 98617 invoked by uid 99); 18 Nov 2014 23:37:58 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Nov 2014 23:37:58 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 100E49A95D9; Tue, 18 Nov 2014 23:37:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 18 Nov 2014 23:38:38 -0000 Message-Id: <3d2338e98bb64a3c96eec74adb8125af@git.apache.org> In-Reply-To: <02d06507438644d0a31b17f687b6b82f@git.apache.org> References: <02d06507438644d0a31b17f687b6b82f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [45/52] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 Rename HornetQ* classes to ActiveMQ* http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java new file mode 100644 index 0000000..4e3d34c --- /dev/null +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java @@ -0,0 +1,940 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.core.protocol.core.impl; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + +import org.apache.activemq.api.core.ActiveMQBuffer; +import org.apache.activemq.api.core.ActiveMQException; +import org.apache.activemq.api.core.ActiveMQExceptionType; +import org.apache.activemq.api.core.Message; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.api.core.client.ClientConsumer; +import org.apache.activemq.api.core.client.ClientSession; +import org.apache.activemq.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq.core.client.ActiveMQClientLogger; +import org.apache.activemq.core.client.ActiveMQClientMessageBundle; +import org.apache.activemq.core.client.impl.AddressQueryImpl; +import org.apache.activemq.core.client.impl.ClientConsumerImpl; +import org.apache.activemq.core.client.impl.ClientConsumerInternal; +import org.apache.activemq.core.client.impl.ClientLargeMessageInternal; +import org.apache.activemq.core.client.impl.ClientMessageInternal; +import org.apache.activemq.core.client.impl.ClientProducerCreditsImpl; +import org.apache.activemq.core.client.impl.ClientSessionImpl; +import org.apache.activemq.core.message.impl.MessageInternal; +import org.apache.activemq.core.protocol.core.Channel; +import org.apache.activemq.core.protocol.core.ChannelHandler; +import org.apache.activemq.core.protocol.core.CommandConfirmationHandler; +import org.apache.activemq.core.protocol.core.CoreRemotingConnection; +import org.apache.activemq.core.protocol.core.Packet; +import org.apache.activemq.core.protocol.core.impl.wireformat.CreateQueueMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.CreateSessionMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.DisconnectConsumerMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.ReattachSessionMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.RollbackMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCloseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionExpireMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionIndividualAcknowledgeMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsFailMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionReceiveMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendLargeMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionSendMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXACommitMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAEndMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAForgetMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAGetTimeoutResponseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAJoinMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAPrepareMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAResponseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAResumeMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXARollbackMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.SessionXAStartMessage; +import org.apache.activemq.spi.core.protocol.RemotingConnection; +import org.apache.activemq.spi.core.remoting.Connection; +import org.apache.activemq.spi.core.remoting.SessionContext; +import org.apache.activemq.utils.TokenBucketLimiterImpl; + +import static org.apache.activemq.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER; +import static org.apache.activemq.core.protocol.core.impl.PacketImpl.EXCEPTION; +import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION; +import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG; +import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG; + +/** + * @author Clebert Suconic + */ + +public class ActiveMQSessionContext extends SessionContext +{ + private final Channel sessionChannel; + private final int serverVersion; + private int confirmationWindow; + private final String name; + + + public ActiveMQSessionContext(String name, RemotingConnection remotingConnection, Channel sessionChannel, int serverVersion, int confirmationWindow) + { + super(remotingConnection); + + this.name = name; + this.sessionChannel = sessionChannel; + this.serverVersion = serverVersion; + this.confirmationWindow = confirmationWindow; + + ChannelHandler handler = new ClientSessionPacketHandler(); + sessionChannel.setHandler(handler); + + + if (confirmationWindow >= 0) + { + sessionChannel.setCommandConfirmationHandler(confirmationHandler); + } + } + + + private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() + { + public void commandConfirmed(final Packet packet) + { + if (packet.getType() == PacketImpl.SESS_SEND) + { + SessionSendMessage ssm = (SessionSendMessage) packet; + callSendAck(ssm.getHandler(), ssm.getMessage()); + } + else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) + { + SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet; + if (!scm.isContinues()) + { + callSendAck(scm.getHandler(), scm.getMessage()); + } + } + } + + private void callSendAck(SendAcknowledgementHandler handler, final Message message) + { + if (handler != null) + { + handler.sendAcknowledged(message); + } + else if (sendAckHandler != null) + { + sendAckHandler.sendAcknowledged(message); + } + } + + }; + + + // Failover utility methods + + @Override + public void returnBlocking(ActiveMQException cause) + { + sessionChannel.returnBlocking(cause); + } + + @Override + public void lockCommunications() + { + sessionChannel.lock(); + } + + @Override + public void releaseCommunications() + { + sessionChannel.setTransferring(false); + sessionChannel.unlock(); + } + + public void cleanup() + { + sessionChannel.close(); + + // if the server is sending a disconnect + // any pending blocked operation could hang without this + sessionChannel.returnBlocking(); + } + + @Override + public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits) + { + // nothing to be done here... Flow control here is done on the core side + } + + + public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) + { + sessionChannel.setCommandConfirmationHandler(confirmationHandler); + this.sendAckHandler = handler; + } + + public void createSharedQueue(SimpleString address, + SimpleString queueName, + SimpleString filterString, + boolean durable) throws ActiveMQException + { + sessionChannel.sendBlocking(new CreateSharedQueueMessage(address, queueName, filterString, durable, true), PacketImpl.NULL_RESPONSE); + } + + public void deleteQueue(final SimpleString queueName) throws ActiveMQException + { + sessionChannel.sendBlocking(new SessionDeleteQueueMessage(queueName), PacketImpl.NULL_RESPONSE); + } + + public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException + { + SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); + SessionQueueQueryResponseMessage response = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); + + return response.toQueueQuery(); + + } + + + public ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString, + int windowSize, int maxRate, int ackBatchSize, boolean browseOnly, + Executor executor, Executor flowControlExecutor) throws ActiveMQException + { + long consumerID = idGenerator.generateID(); + + ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID); + + SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, + queueName, + filterString, + browseOnly, + true); + + SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP); + + // The actual windows size that gets used is determined by the user since + // could be overridden on the queue settings + // The value we send is just a hint + + return new ClientConsumerImpl(session, + consumerContext, + queueName, + filterString, + browseOnly, + calcWindowSize(windowSize), + ackBatchSize, + maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, + false) + : null, + executor, + flowControlExecutor, + this, + queueInfo.toQueueQuery(), + lookupTCCL()); + } + + + public int getServerVersion() + { + return serverVersion; + } + + public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException + { + SessionBindingQueryResponseMessage response = + (SessionBindingQueryResponseMessage) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); + + return new AddressQueryImpl(response.isExists(), response.getQueueNames()); + } + + + @Override + public void closeConsumer(final ClientConsumer consumer) throws ActiveMQException + { + sessionChannel.sendBlocking(new SessionConsumerCloseMessage(getConsumerID(consumer)), PacketImpl.NULL_RESPONSE); + } + + public void sendConsumerCredits(final ClientConsumer consumer, final int credits) + { + sessionChannel.send(new SessionConsumerFlowCreditMessage(getConsumerID(consumer), credits)); + } + + public void forceDelivery(final ClientConsumer consumer, final long sequence) throws ActiveMQException + { + SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(getConsumerID(consumer), sequence); + sessionChannel.send(request); + } + + public void simpleCommit() throws ActiveMQException + { + sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE); + } + + public void simpleRollback(boolean lastMessageAsDelivered) throws ActiveMQException + { + sessionChannel.sendBlocking(new RollbackMessage(lastMessageAsDelivered), PacketImpl.NULL_RESPONSE); + } + + public void sessionStart() throws ActiveMQException + { + sessionChannel.send(new PacketImpl(PacketImpl.SESS_START)); + } + + public void sessionStop() throws ActiveMQException + { + sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP), PacketImpl.NULL_RESPONSE); + } + + public void addSessionMetadata(String key, String data) throws ActiveMQException + { + sessionChannel.sendBlocking(new SessionAddMetaDataMessageV2(key, data), PacketImpl.NULL_RESPONSE); + } + + + public void addUniqueMetaData(String key, String data) throws ActiveMQException + { + sessionChannel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data), PacketImpl.NULL_RESPONSE); + } + + public void xaCommit(Xid xid, boolean onePhase) throws XAException, ActiveMQException + { + SessionXACommitMessage packet = new SessionXACommitMessage(xid, onePhase); + SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); + + if (response.isError()) + { + throw new XAException(response.getResponseCode()); + } + + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) + { + ActiveMQClientLogger.LOGGER.trace("finished commit on " + ClientSessionImpl.convert(xid) + " with response = " + response); + } + } + + public void xaEnd(Xid xid, int flags) throws XAException, ActiveMQException + { + Packet packet; + if (flags == XAResource.TMSUSPEND) + { + packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND); + } + else if (flags == XAResource.TMSUCCESS) + { + packet = new SessionXAEndMessage(xid, false); + } + else if (flags == XAResource.TMFAIL) + { + packet = new SessionXAEndMessage(xid, true); + } + else + { + throw new XAException(XAException.XAER_INVAL); + } + + SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); + + if (response.isError()) + { + throw new XAException(response.getResponseCode()); + } + } + + + public void sendProducerCreditsMessage(final int credits, final SimpleString address) + { + sessionChannel.send(new SessionRequestProducerCreditsMessage(credits, address)); + } + + /** + * ActiveMQ does support large messages + * + * @return + */ + public boolean supportsLargeMessage() + { + return true; + } + + @Override + public int getCreditsOnSendingFull(MessageInternal msgI) + { + return msgI.getEncodeSize(); + } + + public void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws ActiveMQException + { + SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking, handler); + + if (sendBlocking) + { + sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE); + } + else + { + sessionChannel.sendBatched(packet); + } + } + + @Override + public int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws ActiveMQException + { + SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(msgI); + + sessionChannel.send(initialChunk); + + return msgI.getHeadersAndPropertiesEncodeSize(); + } + + @Override + public int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException + { + final boolean requiresResponse = lastChunk && sendBlocking; + final SessionSendContinuationMessage chunkPacket = + new SessionSendContinuationMessage(msgI, chunk, !lastChunk, + requiresResponse, messageBodySize, messageHandler); + + if (requiresResponse) + { + // When sending it blocking, only the last chunk will be blocking. + sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); + } + else + { + sessionChannel.send(chunkPacket); + } + + return chunkPacket.getPacketSize(); + } + + public void sendACK(boolean individual, boolean block, final ClientConsumer consumer, final Message message) throws ActiveMQException + { + PacketImpl messagePacket; + if (individual) + { + messagePacket = new SessionIndividualAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block); + } + else + { + messagePacket = new SessionAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block); + } + + if (block) + { + sessionChannel.sendBlocking(messagePacket, PacketImpl.NULL_RESPONSE); + } + else + { + sessionChannel.sendBatched(messagePacket); + } + } + + public void expireMessage(final ClientConsumer consumer, Message message) throws ActiveMQException + { + SessionExpireMessage messagePacket = new SessionExpireMessage(getConsumerID(consumer), message.getMessageID()); + + sessionChannel.send(messagePacket); + } + + + public void sessionClose() throws ActiveMQException + { + sessionChannel.sendBlocking(new SessionCloseMessage(), PacketImpl.NULL_RESPONSE); + } + + public void xaForget(Xid xid) throws XAException, ActiveMQException + { + SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(new SessionXAForgetMessage(xid), PacketImpl.SESS_XA_RESP); + + if (response.isError()) + { + throw new XAException(response.getResponseCode()); + } + } + + public int xaPrepare(Xid xid) throws XAException, ActiveMQException + { + SessionXAPrepareMessage packet = new SessionXAPrepareMessage(xid); + + SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); + + if (response.isError()) + { + throw new XAException(response.getResponseCode()); + } + else + { + return response.getResponseCode(); + } + } + + public Xid[] xaScan() throws ActiveMQException + { + SessionXAGetInDoubtXidsResponseMessage response = (SessionXAGetInDoubtXidsResponseMessage) sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS), PacketImpl.SESS_XA_INDOUBT_XIDS_RESP); + + List xids = response.getXids(); + + Xid[] xidArray = xids.toArray(new Xid[xids.size()]); + + return xidArray; + } + + public void xaRollback(Xid xid, boolean wasStarted) throws ActiveMQException, XAException + { + SessionXARollbackMessage packet = new SessionXARollbackMessage(xid); + + SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); + + if (response.isError()) + { + throw new XAException(response.getResponseCode()); + } + } + + public void xaStart(Xid xid, int flags) throws XAException, ActiveMQException + { + Packet packet; + if (flags == XAResource.TMJOIN) + { + packet = new SessionXAJoinMessage(xid); + } + else if (flags == XAResource.TMRESUME) + { + packet = new SessionXAResumeMessage(xid); + } + else if (flags == XAResource.TMNOFLAGS) + { + // Don't need to flush since the previous end will have done this + packet = new SessionXAStartMessage(xid); + } + else + { + throw new XAException(XAException.XAER_INVAL); + } + + SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); + + if (response.isError()) + { + ActiveMQClientLogger.LOGGER.errorCallingStart(response.getMessage(), response.getResponseCode()); + throw new XAException(response.getResponseCode()); + } + } + + public boolean configureTransactionTimeout(int seconds) throws ActiveMQException + { + SessionXASetTimeoutResponseMessage response = (SessionXASetTimeoutResponseMessage) sessionChannel.sendBlocking(new SessionXASetTimeoutMessage(seconds), PacketImpl.SESS_XA_SET_TIMEOUT_RESP); + + return response.isOK(); + } + + public int recoverSessionTimeout() throws ActiveMQException + { + SessionXAGetTimeoutResponseMessage response = (SessionXAGetTimeoutResponseMessage) sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT), PacketImpl.SESS_XA_GET_TIMEOUT_RESP); + + return response.getTimeoutSeconds(); + } + + public void createQueue(SimpleString address, SimpleString queueName, SimpleString filterString, boolean durable, boolean temp) throws ActiveMQException + { + CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp, true); + sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE); + } + + @Override + public boolean reattachOnNewConnection(RemotingConnection newConnection) throws ActiveMQException + { + + this.remotingConnection = newConnection; + + sessionChannel.transferConnection((CoreRemotingConnection) newConnection); + + Packet request = new ReattachSessionMessage(name, sessionChannel.getLastConfirmedCommandID()); + + Channel channel1 = getCoreConnection().getChannel(1, -1); + + ReattachSessionResponseMessage response = (ReattachSessionResponseMessage) channel1.sendBlocking(request, PacketImpl.REATTACH_SESSION_RESP); + + if (response.isReattached()) + { + if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) + { + ActiveMQClientLogger.LOGGER.debug("ClientSession reattached fine, replaying commands"); + } + // The session was found on the server - we reattached transparently ok + + sessionChannel.replayCommands(response.getLastConfirmedCommandID()); + + return true; + } + else + { + + sessionChannel.clearCommands(); + + return false; + } + + } + + public void recreateSession(final String username, + final String password, + final int minLargeMessageSize, + final boolean xa, + final boolean autoCommitSends, + final boolean autoCommitAcks, + final boolean preAcknowledge, + final SimpleString defaultAddress) throws ActiveMQException + { + Packet createRequest = new CreateSessionMessage(name, + sessionChannel.getID(), + getServerVersion(), + username, + password, + minLargeMessageSize, + xa, + autoCommitSends, + autoCommitAcks, + preAcknowledge, + confirmationWindow, + defaultAddress == null ? null + : defaultAddress.toString()); + boolean retry; + do + { + try + { + getCreateChannel().sendBlocking(createRequest, PacketImpl.CREATESESSION_RESP); + retry = false; + } + catch (ActiveMQException e) + { + // the session was created while its server was starting, retry it: + if (e.getType() == ActiveMQExceptionType.SESSION_CREATION_REJECTED) + { + ActiveMQClientLogger.LOGGER.retryCreateSessionSeverStarting(name); + retry = true; + // sleep a little bit to avoid spinning too much + try + { + Thread.sleep(10); + } + catch (InterruptedException ie) + { + Thread.currentThread().interrupt(); + throw e; + } + } + else + { + throw e; + } + } + } + while (retry && !session.isClosing()); + } + + @Override + public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException + { + ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo(); + + // We try and recreate any non durable queues, since they probably won't be there unless + // they are defined in hornetq-configuration.xml + // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover + if (!queueInfo.isDurable()) + { + CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(), + queueInfo.getName(), + queueInfo.getFilterString(), + false, + queueInfo.isTemporary(), + false); + + sendPacketWithoutLock(sessionChannel, createQueueRequest); + } + + SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), + consumerInternal.getQueueName(), + consumerInternal.getFilterString(), + consumerInternal.isBrowseOnly(), + false); + + sendPacketWithoutLock(sessionChannel, createConsumerRequest); + + int clientWindowSize = consumerInternal.getClientWindowSize(); + + if (clientWindowSize != 0) + { + SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), + clientWindowSize); + + sendPacketWithoutLock(sessionChannel, packet); + } + else + { + // https://jira.jboss.org/browse/HORNETQ-522 + SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), + 1); + sendPacketWithoutLock(sessionChannel, packet); + } + } + + public void xaFailed(Xid xid) throws ActiveMQException + { + sendPacketWithoutLock(sessionChannel, new SessionXAAfterFailedMessage(xid)); + } + + public void restartSession() throws ActiveMQException + { + sendPacketWithoutLock(sessionChannel, new PacketImpl(PacketImpl.SESS_START)); + } + + @Override + public void resetMetadata(HashMap metaDataToSend) + { + // Resetting the metadata after failover + for (Map.Entry entries : metaDataToSend.entrySet()) + { + sendPacketWithoutLock(sessionChannel, new SessionAddMetaDataMessageV2(entries.getKey(), entries.getValue(), false)); + } + } + + + private Channel getCreateChannel() + { + return getCoreConnection().getChannel(1, -1); + } + + private CoreRemotingConnection getCoreConnection() + { + return (CoreRemotingConnection) remotingConnection; + } + + + /** + * This doesn't apply to other protocols probably, so it will be an ActiveMQ exclusive feature + * + * @throws org.apache.activemq.api.core.ActiveMQException + */ + private void handleConsumerDisconnected(DisconnectConsumerMessage packet) throws ActiveMQException + { + DisconnectConsumerMessage message = packet; + + session.handleConsumerDisconnect(new ActiveMQConsumerContext(message.getConsumerId())); + } + + private void handleReceivedMessagePacket(SessionReceiveMessage messagePacket) throws Exception + { + ClientMessageInternal msgi = (ClientMessageInternal) messagePacket.getMessage(); + + msgi.setDeliveryCount(messagePacket.getDeliveryCount()); + + msgi.setFlowControlSize(messagePacket.getPacketSize()); + + handleReceiveMessage(new ActiveMQConsumerContext(messagePacket.getConsumerID()), msgi); + } + + private void handleReceiveLargeMessage(SessionReceiveLargeMessage serverPacket) throws Exception + { + ClientLargeMessageInternal clientLargeMessage = (ClientLargeMessageInternal) serverPacket.getLargeMessage(); + + clientLargeMessage.setFlowControlSize(serverPacket.getPacketSize()); + + clientLargeMessage.setDeliveryCount(serverPacket.getDeliveryCount()); + + handleReceiveLargeMessage(new ActiveMQConsumerContext(serverPacket.getConsumerID()), clientLargeMessage, serverPacket.getLargeMessageSize()); + } + + + private void handleReceiveContinuation(SessionReceiveContinuationMessage continuationPacket) throws Exception + { + handleReceiveContinuation(new ActiveMQConsumerContext(continuationPacket.getConsumerID()), continuationPacket.getBody(), continuationPacket.getPacketSize(), + continuationPacket.isContinues()); + } + + + protected void handleReceiveProducerCredits(SessionProducerCreditsMessage message) + { + handleReceiveProducerCredits(message.getAddress(), message.getCredits()); + } + + + protected void handleReceiveProducerFailCredits(SessionProducerCreditsFailMessage message) + { + handleReceiveProducerFailCredits(message.getAddress(), message.getCredits()); + } + + class ClientSessionPacketHandler implements ChannelHandler + { + + public void handlePacket(final Packet packet) + { + byte type = packet.getType(); + + try + { + switch (type) + { + case DISCONNECT_CONSUMER: + { + handleConsumerDisconnected((DisconnectConsumerMessage) packet); + break; + } + case SESS_RECEIVE_CONTINUATION: + { + handleReceiveContinuation((SessionReceiveContinuationMessage) packet); + + break; + } + case SESS_RECEIVE_MSG: + { + handleReceivedMessagePacket((SessionReceiveMessage) packet); + + break; + } + case SESS_RECEIVE_LARGE_MSG: + { + handleReceiveLargeMessage((SessionReceiveLargeMessage) packet); + + break; + } + case PacketImpl.SESS_PRODUCER_CREDITS: + { + handleReceiveProducerCredits((SessionProducerCreditsMessage) packet); + + break; + } + case PacketImpl.SESS_PRODUCER_FAIL_CREDITS: + { + handleReceiveProducerFailCredits((SessionProducerCreditsFailMessage) packet); + + break; + } + case EXCEPTION: + { + // We can only log these exceptions + // maybe we should cache it on SessionContext and throw an exception on any next calls + ActiveMQExceptionMessage mem = (ActiveMQExceptionMessage) packet; + + ActiveMQClientLogger.LOGGER.receivedExceptionAsynchronously(mem.getException()); + + break; + } + default: + { + throw new IllegalStateException("Invalid packet: " + type); + } + } + } + catch (Exception e) + { + ActiveMQClientLogger.LOGGER.failedToHandlePacket(e); + } + + sessionChannel.confirm(packet); + } + } + + private long getConsumerID(ClientConsumer consumer) + { + return ((ActiveMQConsumerContext)consumer.getConsumerContext()).getId(); + } + + private ClassLoader lookupTCCL() + { + return AccessController.doPrivileged(new PrivilegedAction() + { + public ClassLoader run() + { + return Thread.currentThread().getContextClassLoader(); + } + }); + + } + + private int calcWindowSize(final int windowSize) + { + int clientWindowSize; + if (windowSize == -1) + { + // No flow control - buffer can increase without bound! Only use with + // caution for very fast consumers + clientWindowSize = -1; + } + else if (windowSize == 0) + { + // Slow consumer - no buffering + clientWindowSize = 0; + } + else if (windowSize == 1) + { + // Slow consumer = buffer 1 + clientWindowSize = 1; + } + else if (windowSize > 1) + { + // Client window size is half server window size + clientWindowSize = windowSize >> 1; + } + else + { + throw ActiveMQClientMessageBundle.BUNDLE.invalidWindowSize(windowSize); + } + + return clientWindowSize; + } + + + private void sendPacketWithoutLock(final Channel parameterChannel, final Packet packet) + { + packet.setChannelID(parameterChannel.getID()); + + Connection conn = parameterChannel.getConnection().getTransportConnection(); + + ActiveMQBuffer buffer = packet.encode(this.getCoreConnection()); + + conn.write(buffer, false, false); + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java index c5709b4..de7a3ef 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java @@ -25,14 +25,14 @@ import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQException; import org.apache.activemq.api.core.ActiveMQInterruptedException; import org.apache.activemq.api.core.Interceptor; -import org.apache.activemq.core.client.HornetQClientLogger; -import org.apache.activemq.core.client.HornetQClientMessageBundle; +import org.apache.activemq.core.client.ActiveMQClientLogger; +import org.apache.activemq.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.core.protocol.core.Channel; import org.apache.activemq.core.protocol.core.ChannelHandler; import org.apache.activemq.core.protocol.core.CommandConfirmationHandler; import org.apache.activemq.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.core.protocol.core.Packet; -import org.apache.activemq.core.protocol.core.impl.wireformat.HornetQExceptionMessage; +import org.apache.activemq.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; import org.apache.activemq.spi.core.protocol.RemotingConnection; @@ -83,7 +83,7 @@ public final class ChannelImpl implements Channel } } - private static final boolean isTrace = HornetQClientLogger.LOGGER.isTraceEnabled(); + private static final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled(); private volatile long id; @@ -193,7 +193,7 @@ public final class ChannelImpl implements Channel try { - response = new HornetQExceptionMessage(HornetQClientMessageBundle.BUNDLE.unblockingACall(cause)); + response = new ActiveMQExceptionMessage(ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause)); sendCondition.signal(); } @@ -237,7 +237,7 @@ public final class ChannelImpl implements Channel if (isTrace) { - HornetQClientLogger.LOGGER.trace("Sending packet nonblocking " + packet + " on channeID=" + id); + ActiveMQClientLogger.LOGGER.trace("Sending packet nonblocking " + packet + " on channeID=" + id); } ActiveMQBuffer buffer = packet.encode(connection); @@ -277,7 +277,7 @@ public final class ChannelImpl implements Channel if (isTrace) { - HornetQClientLogger.LOGGER.trace("Writing buffer for channelID=" + id); + ActiveMQClientLogger.LOGGER.trace("Writing buffer for channelID=" + id); } @@ -301,12 +301,12 @@ public final class ChannelImpl implements Channel if (interceptionResult != null) { // if we don't throw an exception here the client might not unblock - throw HornetQClientMessageBundle.BUNDLE.interceptorRejectedPacket(interceptionResult); + throw ActiveMQClientMessageBundle.BUNDLE.interceptorRejectedPacket(interceptionResult); } if (closed) { - throw HornetQClientMessageBundle.BUNDLE.connectionDestroyed(); + throw ActiveMQClientMessageBundle.BUNDLE.connectionDestroyed(); } if (connection.getBlockingCallTimeout() == -1) @@ -341,7 +341,7 @@ public final class ChannelImpl implements Channel { if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(), TimeUnit.MILLISECONDS)) { - HornetQClientLogger.LOGGER.debug("timed-out waiting for failover condition"); + ActiveMQClientLogger.LOGGER.debug("timed-out waiting for failover condition"); } } } @@ -378,7 +378,7 @@ public final class ChannelImpl implements Channel if (response != null && response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket) { - HornetQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace")); + ActiveMQClientLogger.LOGGER.packetOutOfOrder(response, new Exception("trace")); } if (closed) @@ -395,12 +395,12 @@ public final class ChannelImpl implements Channel if (response == null) { - throw HornetQClientMessageBundle.BUNDLE.timedOutSendingPacket(packet.getType()); + throw ActiveMQClientMessageBundle.BUNDLE.timedOutSendingPacket(packet.getType()); } if (response.getType() == PacketImpl.EXCEPTION) { - final HornetQExceptionMessage mem = (HornetQExceptionMessage) response; + final ActiveMQExceptionMessage mem = (ActiveMQExceptionMessage) response; ActiveMQException e = mem.getException(); @@ -433,13 +433,13 @@ public final class ChannelImpl implements Channel { boolean callNext = interceptor.intercept(packet, connection); - if (HornetQClientLogger.LOGGER.isDebugEnabled()) + if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) { // use a StringBuilder for speed since this may be executed a lot StringBuilder msg = new StringBuilder(); msg.append("Invocation of interceptor ").append(interceptor.getClass().getName()).append(" on "). append(packet).append(" returned ").append(callNext); - HornetQClientLogger.LOGGER.debug(msg.toString()); + ActiveMQClientLogger.LOGGER.debug(msg.toString()); } if (!callNext) @@ -449,7 +449,7 @@ public final class ChannelImpl implements Channel } catch (final Throwable e) { - HornetQClientLogger.LOGGER.errorCallingInterceptor(e, interceptor); + ActiveMQClientLogger.LOGGER.errorCallingInterceptor(e, interceptor); } } } @@ -488,7 +488,7 @@ public final class ChannelImpl implements Channel if (!connection.isDestroyed() && !connection.removeChannel(id)) { - throw HornetQClientMessageBundle.BUNDLE.noChannelToClose(id); + throw ActiveMQClientMessageBundle.BUNDLE.noChannelToClose(id); } if (failingOver) @@ -524,7 +524,7 @@ public final class ChannelImpl implements Channel { if (isTrace) { - HornetQClientLogger.LOGGER.trace("Replaying commands on channelID=" + id); + ActiveMQClientLogger.LOGGER.trace("Replaying commands on channelID=" + id); } clearUpTo(otherLastConfirmedCommandID); @@ -664,7 +664,7 @@ public final class ChannelImpl implements Channel if (numberToClear == -1) { - throw HornetQClientMessageBundle.BUNDLE.invalidCommandID(lastReceivedCommandID); + throw ActiveMQClientMessageBundle.BUNDLE.invalidCommandID(lastReceivedCommandID); } int sizeToFree = 0; @@ -677,7 +677,7 @@ public final class ChannelImpl implements Channel { if (lastReceivedCommandID > 0) { - HornetQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID); + ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID); } firstStoredCommandID = lastReceivedCommandID + 1; return; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManager.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManager.java deleted file mode 100644 index c98db97..0000000 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManager.java +++ /dev/null @@ -1,613 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.apache.activemq.core.protocol.core.impl; - -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; - -import io.netty.channel.ChannelPipeline; -import org.apache.activemq.api.core.ActiveMQBuffer; -import org.apache.activemq.api.core.ActiveMQException; -import org.apache.activemq.api.core.ActiveMQExceptionType; -import org.apache.activemq.api.core.ActiveMQInterruptedException; -import org.apache.activemq.api.core.Interceptor; -import org.apache.activemq.api.core.Pair; -import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.api.core.client.ClientSessionFactory; -import org.apache.activemq.api.core.client.HornetQClient; -import org.apache.activemq.core.client.HornetQClientLogger; -import org.apache.activemq.core.client.HornetQClientMessageBundle; -import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal; -import org.apache.activemq.core.protocol.ClientPacketDecoder; -import org.apache.activemq.core.protocol.core.Channel; -import org.apache.activemq.core.protocol.core.ChannelHandler; -import org.apache.activemq.core.protocol.core.CoreRemotingConnection; -import org.apache.activemq.core.protocol.core.Packet; -import org.apache.activemq.core.protocol.core.impl.wireformat.CheckFailoverMessage; -import org.apache.activemq.core.protocol.core.impl.wireformat.CheckFailoverReplyMessage; -import org.apache.activemq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage; -import org.apache.activemq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2; -import org.apache.activemq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V3; -import org.apache.activemq.core.protocol.core.impl.wireformat.CreateSessionMessage; -import org.apache.activemq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage; -import org.apache.activemq.core.protocol.core.impl.wireformat.DisconnectMessage; -import org.apache.activemq.core.protocol.core.impl.wireformat.DisconnectMessage_V2; -import org.apache.activemq.core.protocol.core.impl.wireformat.Ping; -import org.apache.activemq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2; -import org.apache.activemq.core.remoting.impl.netty.HornetQFrameDecoder2; -import org.apache.activemq.core.version.Version; -import org.apache.activemq.spi.core.protocol.RemotingConnection; -import org.apache.activemq.spi.core.remoting.ClientProtocolManager; -import org.apache.activemq.spi.core.remoting.Connection; -import org.apache.activemq.spi.core.remoting.TopologyResponseHandler; -import org.apache.activemq.spi.core.remoting.SessionContext; -import org.apache.activemq.utils.VersionLoader; - -/** - * This class will return specific packets for different types of actions happening on a messaging protocol. - *

- * This is trying to unify the Core client into multiple protocols. - *

- * Returning null in certain packets means no action is taken on this specific protocol. - *

- * Semantic properties could also be added to this implementation. - *

- * Implementations of this class need to be stateless. - * - * @author Clebert Suconic - */ - -public class HornetQClientProtocolManager implements ClientProtocolManager -{ - private final int versionID = VersionLoader.getVersion().getIncrementingVersion(); - - private ClientSessionFactoryInternal factoryInternal; - - /** - * Guards assignments to {@link #inCreateSession} and {@link #inCreateSessionLatch} - */ - private final Object inCreateSessionGuard = new Object(); - - /** - * Flag that tells whether we are trying to create a session. - */ - private boolean inCreateSession; - - /** - * Used to wait for the creation of a session. - */ - private CountDownLatch inCreateSessionLatch; - - protected volatile RemotingConnectionImpl connection; - - protected TopologyResponseHandler topologyResponseHandler; - - /** - * Flag that signals that the communication is closing. Causes many processes to exit. - */ - private volatile boolean alive = true; - - private final CountDownLatch waitLatch = new CountDownLatch(1); - - - public HornetQClientProtocolManager() - { - } - - public String getName() - { - return HornetQClient.DEFAULT_CORE_PROTOCOL; - } - - public void setSessionFactory(ClientSessionFactory factory) - { - this.factoryInternal = (ClientSessionFactoryInternal)factory; - } - - public ClientSessionFactory getSessionFactory() - { - return this.factoryInternal; - } - - @Override - public void addChannelHandlers(ChannelPipeline pipeline) - { - pipeline.addLast("hornetq-decoder", new HornetQFrameDecoder2()); - } - - public boolean waitOnLatch(long milliseconds) throws InterruptedException - { - return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS); - } - - public Channel getChannel0() - { - if (connection == null) - { - return null; - } - else - { - return connection.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1); - } - } - - public RemotingConnection getCurrentConnection() - { - return connection; - } - - - public Channel getChannel1() - { - if (connection == null) - { - return null; - } - else - { - return connection.getChannel(1, -1); - } - } - - public Lock lockSessionCreation() - { - try - { - Lock localFailoverLock = factoryInternal.lockFailover(); - try - { - if (connection == null) - { - return null; - } - - Lock lock = getChannel1().getLock(); - - // Lock it - this must be done while the failoverLock is held - while (isAlive() && !lock.tryLock(100, TimeUnit.MILLISECONDS)) - { - } - - return lock; - } - finally - { - localFailoverLock.unlock(); - } - // We can now release the failoverLock - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - return null; - } - } - - - public void stop() - { - alive = false; - - - synchronized (inCreateSessionGuard) - { - if (inCreateSessionLatch != null) - inCreateSessionLatch.countDown(); - } - - - Channel channel1 = getChannel1(); - if (channel1 != null) - { - channel1.returnBlocking(); - } - - waitLatch.countDown(); - - } - - public boolean isAlive() - { - return alive; - } - - - @Override - public void ping(long connectionTTL) - { - Channel channel = connection.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1); - - Ping ping = new Ping(connectionTTL); - - channel.send(ping); - - connection.flush(); - } - - @Override - public void sendSubscribeTopology(final boolean isServer) - { - getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, - VersionLoader.getVersion() - .getIncrementingVersion())); - } - - @Override - public SessionContext createSessionContext(String name, String username, String password, - boolean xa, boolean autoCommitSends, boolean autoCommitAcks, - boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws ActiveMQException - { - for (Version clientVersion : VersionLoader.getClientVersions()) - { - try - { - return createSessionContext(clientVersion, - name, - username, - password, - xa, - autoCommitSends, - autoCommitAcks, - preAcknowledge, - minLargeMessageSize, - confirmationWindowSize); - } - catch (ActiveMQException e) - { - if (e.getType() != ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS) - { - throw e; - } - } - } - connection.destroy(); - throw new ActiveMQException(ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS); - } - - public SessionContext createSessionContext(Version clientVersion, String name, String username, String password, - boolean xa, boolean autoCommitSends, boolean autoCommitAcks, - boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws ActiveMQException - { - if (!isAlive()) - throw HornetQClientMessageBundle.BUNDLE.clientSessionClosed(); - - Channel sessionChannel = null; - CreateSessionResponseMessage response = null; - - boolean retry; - do - { - retry = false; - - Lock lock = null; - - try - { - - lock = lockSessionCreation(); - - // We now set a flag saying createSession is executing - synchronized (inCreateSessionGuard) - { - if (!isAlive()) - throw HornetQClientMessageBundle.BUNDLE.clientSessionClosed(); - inCreateSession = true; - inCreateSessionLatch = new CountDownLatch(1); - } - - long sessionChannelID = connection.generateChannelID(); - - Packet request = new CreateSessionMessage(name, - sessionChannelID, - clientVersion.getIncrementingVersion(), - username, - password, - minLargeMessageSize, - xa, - autoCommitSends, - autoCommitAcks, - preAcknowledge, - confirmationWindowSize, - null); - - - try - { - // channel1 reference here has to go away - response = (CreateSessionResponseMessage) getChannel1().sendBlocking(request, PacketImpl.CREATESESSION_RESP); - } - catch (ActiveMQException cause) - { - if (!isAlive()) - throw cause; - - if (cause.getType() == ActiveMQExceptionType.UNBLOCKED) - { - // This means the thread was blocked on create session and failover unblocked it - // so failover could occur - - retry = true; - - continue; - } - else - { - throw cause; - } - } - - sessionChannel = connection.getChannel(sessionChannelID, confirmationWindowSize); - - - } - catch (Throwable t) - { - if (lock != null) - { - lock.unlock(); - lock = null; - } - - if (t instanceof ActiveMQException) - { - throw (ActiveMQException) t; - } - else - { - throw HornetQClientMessageBundle.BUNDLE.failedToCreateSession(t); - } - } - finally - { - if (lock != null) - { - lock.unlock(); - } - - // Execution has finished so notify any failover thread that may be waiting for us to be done - inCreateSession = false; - inCreateSessionLatch.countDown(); - } - } - while (retry); - - - // these objects won't be null, otherwise it would keep retrying on the previous loop - return new HornetQSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize); - - } - - public boolean cleanupBeforeFailover(ActiveMQException cause) - { - - boolean needToInterrupt; - - CountDownLatch exitLockLatch; - Lock lock = lockSessionCreation(); - - if (lock == null) - { - return false; - } - - try - { - synchronized (inCreateSessionGuard) - { - needToInterrupt = inCreateSession; - exitLockLatch = inCreateSessionLatch; - } - } - finally - { - lock.unlock(); - } - - if (needToInterrupt) - { - forceReturnChannel1(cause); - - // Now we need to make sure that the thread has actually exited and returned it's - // connections - // before failover occurs - - while (inCreateSession && isAlive()) - { - try - { - if (exitLockLatch != null) - { - exitLockLatch.await(500, TimeUnit.MILLISECONDS); - } - } - catch (InterruptedException e1) - { - throw new ActiveMQInterruptedException(e1); - } - } - } - - return true; - } - - @Override - public boolean checkForFailover(String liveNodeID) throws ActiveMQException - { - CheckFailoverMessage packet = new CheckFailoverMessage(liveNodeID); - CheckFailoverReplyMessage message = (CheckFailoverReplyMessage) getChannel1().sendBlocking(packet, - PacketImpl.CHECK_FOR_FAILOVER_REPLY); - return message.isOkToFailover(); - } - - - public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, - List incomingInterceptors, List outgoingInterceptors, - TopologyResponseHandler topologyResponseHandler) - { - this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection, - callTimeout, callFailoverTimeout, - incomingInterceptors, outgoingInterceptors); - - this.topologyResponseHandler = topologyResponseHandler; - - getChannel0().setHandler(new Channel0Handler(connection)); - - - sendHandshake(transportConnection); - - return connection; - } - - private void sendHandshake(Connection transportConnection) - { - if (transportConnection.isUsingProtocolHandling()) - { - // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling - String handshake = "HORNETQ"; - ActiveMQBuffer hqbuffer = connection.createBuffer(handshake.length()); - hqbuffer.writeBytes(handshake.getBytes()); - transportConnection.write(hqbuffer); - } - } - - - private class Channel0Handler implements ChannelHandler - { - private final CoreRemotingConnection conn; - - private Channel0Handler(final CoreRemotingConnection conn) - { - this.conn = conn; - } - - public void handlePacket(final Packet packet) - { - final byte type = packet.getType(); - - if (type == PacketImpl.DISCONNECT || type == PacketImpl.DISCONNECT_V2) - { - final DisconnectMessage msg = (DisconnectMessage) packet; - String scaleDownTargetNodeID = null; - - SimpleString nodeID = msg.getNodeID(); - - if (packet instanceof DisconnectMessage_V2) - { - final DisconnectMessage_V2 msg_v2 = (DisconnectMessage_V2) packet; - scaleDownTargetNodeID = msg_v2.getScaleDownNodeID() == null ? null : msg_v2.getScaleDownNodeID().toString(); - } - - if (topologyResponseHandler != null) - topologyResponseHandler.nodeDisconnected(conn, nodeID == null ? null : nodeID.toString(), scaleDownTargetNodeID); - } - else if (type == PacketImpl.CLUSTER_TOPOLOGY) - { - ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; - notifyTopologyChange(topMessage); - } - else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2) - { - ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2) packet; - notifyTopologyChange(topMessage); - } - else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3) - { - ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; - notifyTopologyChange(topMessage); - } - else if (type == PacketImpl.CHECK_FOR_FAILOVER_REPLY) - { - System.out.println("Channel0Handler.handlePacket"); - } - } - - /** - * @param topMessage - */ - private void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage) - { - final long eventUID; - final String backupGroupName; - final String scaleDownGroupName; - if (topMessage instanceof ClusterTopologyChangeMessage_V3) - { - eventUID = ((ClusterTopologyChangeMessage_V3) topMessage).getUniqueEventID(); - backupGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getBackupGroupName(); - scaleDownGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getScaleDownGroupName(); - } - else if (topMessage instanceof ClusterTopologyChangeMessage_V2) - { - eventUID = ((ClusterTopologyChangeMessage_V2) topMessage).getUniqueEventID(); - backupGroupName = ((ClusterTopologyChangeMessage_V2) topMessage).getBackupGroupName(); - scaleDownGroupName = null; - } - else - { - eventUID = System.currentTimeMillis(); - backupGroupName = null; - scaleDownGroupName = null; - } - - if (topMessage.isExit()) - { - if (HornetQClientLogger.LOGGER.isDebugEnabled()) - { - HornetQClientLogger.LOGGER.debug("Notifying " + topMessage.getNodeID() + " going down"); - } - - if (topologyResponseHandler != null) - { - topologyResponseHandler.notifyNodeDown(eventUID, topMessage.getNodeID()); - } - } - else - { - Pair transportConfig = topMessage.getPair(); - if (transportConfig.getA() == null && transportConfig.getB() == null) - { - transportConfig = new Pair<>(conn.getTransportConnection() - .getConnectorConfig(), - null); - } - - if (topologyResponseHandler != null) - { - topologyResponseHandler.notifyNodeUp(eventUID, topMessage.getNodeID(), backupGroupName, scaleDownGroupName, transportConfig, topMessage.isLast()); - } - } - } - } - - protected PacketDecoder getPacketDecoder() - { - return ClientPacketDecoder.INSTANCE; - } - - private void forceReturnChannel1(ActiveMQException cause) - { - if (connection != null) - { - Channel channel1 = connection.getChannel(1, -1); - - if (channel1 != null) - { - channel1.returnBlocking(cause); - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java deleted file mode 100644 index 103e20d..0000000 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQClientProtocolManagerFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.apache.activemq.core.protocol.core.impl; - -import org.apache.activemq.spi.core.remoting.ClientProtocolManager; -import org.apache.activemq.spi.core.remoting.ClientProtocolManagerFactory; - -/** - * @author Clebert Suconic - */ - -public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory -{ - private static final long serialVersionUID = 1; - - private static final HornetQClientProtocolManagerFactory INSTANCE = new HornetQClientProtocolManagerFactory(); - - private HornetQClientProtocolManagerFactory() - { - } - - public static final HornetQClientProtocolManagerFactory getInstance() - { - return INSTANCE; - } - - public ClientProtocolManager newProtocolManager() - { - return new HornetQClientProtocolManager(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQConsumerContext.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQConsumerContext.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQConsumerContext.java deleted file mode 100644 index 27585e8..0000000 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/HornetQConsumerContext.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.apache.activemq.core.protocol.core.impl; - -import org.apache.activemq.spi.core.remoting.ConsumerContext; - -/** - * @author Clebert Suconic - */ - -public class HornetQConsumerContext extends ConsumerContext -{ - private long id; - - public HornetQConsumerContext(long id) - { - this.id = id; - } - - public long getId() - { - return id; - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - HornetQConsumerContext that = (HornetQConsumerContext) o; - - if (id != that.id) return false; - - return true; - } - - @Override - public int hashCode() - { - return (int) (id ^ (id >>> 32)); - } -}