Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EA36F18DF9 for ; Mon, 10 Aug 2015 15:13:24 +0000 (UTC) Received: (qmail 45337 invoked by uid 500); 10 Aug 2015 15:13:02 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 45227 invoked by uid 500); 10 Aug 2015 15:13:02 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 44393 invoked by uid 99); 10 Aug 2015 15:13:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Aug 2015 15:13:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E460BE053F; Mon, 10 Aug 2015 15:13:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Mon, 10 Aug 2015 15:13:31 -0000 Message-Id: <70893fb7c5834c098b14c9216eefd323@git.apache.org> In-Reply-To: <6d75255d38c94b2f8601b49eb76c9d59@git.apache.org> References: <6d75255d38c94b2f8601b49eb76c9d59@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [32/53] [abbrv] [partial] activemq-artemis git commit: automatic checkstyle change http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java index c73e7c9..fba1a1c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java @@ -30,45 +30,52 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; *

* A Channel *does not* support concurrent access by more than one thread! */ -public interface Channel -{ +public interface Channel { + /** * Returns the id of this channel. + * * @return the id */ long getID(); - /** For protocol check */ + /** + * For protocol check + */ boolean supports(byte packetID); /** * Sends a packet on this channel. + * * @param packet the packet to send * @return false if the packet was rejected by an outgoing interceptor; true if the send was - * successful + * successful */ boolean send(Packet packet); /** * Sends a packet on this channel using batching algorithm if appropriate + * * @param packet the packet to send * @return false if the packet was rejected by an outgoing interceptor; true if the send was - * successful + * successful */ boolean sendBatched(Packet packet); /** * Sends a packet on this channel and then blocks until it has been written to the connection. + * * @param packet the packet to send * @return false if the packet was rejected by an outgoing interceptor; true if the send was - * successful + * successful */ boolean sendAndFlush(Packet packet); /** * Sends a packet on this channel and then blocks until a response is received or a timeout * occurs. - * @param packet the packet to send + * + * @param packet the packet to send * @param expectedPacket the packet being expected. * @return the response * @throws ActiveMQException if an error occurs during the send @@ -78,6 +85,7 @@ public interface Channel /** * Sets the {@link org.apache.activemq.artemis.core.protocol.core.ChannelHandler} that this channel should * forward received packets to. + * * @param handler the handler */ void setHandler(ChannelHandler handler); @@ -85,6 +93,7 @@ public interface Channel /** * Gets the {@link org.apache.activemq.artemis.core.protocol.core.ChannelHandler} that this channel should * forward received packets to. + * * @return the current channel handler */ ChannelHandler getHandler(); @@ -100,6 +109,7 @@ public interface Channel * Transfers the connection used by this channel to the one specified. *

* All new packets will be sent via this connection. + * * @param newConnection the new connection */ void transferConnection(CoreRemotingConnection newConnection); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ChannelHandler.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ChannelHandler.java index 2b357e4..a44b6d5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ChannelHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ChannelHandler.java @@ -16,13 +16,12 @@ */ package org.apache.activemq.artemis.core.protocol.core; - /** * A ChannelHandler is used by {@link Channel}. When a channel receives a packet it will call its handler to deal with the * packet. */ -public interface ChannelHandler -{ +public interface ChannelHandler { + /** * called by the channel when a packet is received.. * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CommandConfirmationHandler.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CommandConfirmationHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CommandConfirmationHandler.java index 35d923b..bc07019 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CommandConfirmationHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CommandConfirmationHandler.java @@ -16,13 +16,12 @@ */ package org.apache.activemq.artemis.core.protocol.core; - /** * A CommandConfirmationHandler is used by the channel to confirm confirmations of packets. *

*/ -public interface CommandConfirmationHandler -{ +public interface CommandConfirmationHandler { + /** * called by channel after a confirmation has been received. * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java index 5123707..f2aa5b4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java @@ -19,15 +19,15 @@ package org.apache.activemq.artemis.core.protocol.core; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; - /** * Extension of RemotingConnection for the ActiveMQ Artemis core protocol */ -public interface CoreRemotingConnection extends RemotingConnection -{ +public interface CoreRemotingConnection extends RemotingConnection { - /** The client protocol used on the communication. - * This will determine if the client has support for certain packet types */ + /** + * The client protocol used on the communication. + * This will determine if the client has support for certain packet types + */ int getClientVersion(); /** @@ -40,7 +40,8 @@ public interface CoreRemotingConnection extends RemotingConnection * Returns the channel with the channel id specified. *

* If it does not exist create it with the confirmation window size. - * @param channelID the channel id + * + * @param channelID the channel id * @param confWindowSize the confirmation window size * @return the channel */ @@ -71,36 +72,42 @@ public interface CoreRemotingConnection extends RemotingConnection /** * Resets the id generator used to generate id's. + * * @param id the first id to set it to */ void syncIDGeneratorSequence(long id); /** * Returns the next id to be chosen. + * * @return the id */ long getIDGeneratorSequence(); /** * Returns the current timeout for blocking calls + * * @return the timeout in milliseconds */ long getBlockingCallTimeout(); /** * Returns the current timeout for blocking calls + * * @return the timeout in milliseconds */ long getBlockingCallFailoverTimeout(); /** * Returns the transfer lock used when transferring connections. + * * @return the lock */ Object getTransferLock(); /** * Returns the default security principal + * * @return the principal */ ActiveMQPrincipal getDefaultActiveMQPrincipal(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java index 3da787c..ddb734e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java @@ -22,8 +22,8 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; /** * A Packet represents a packet of data transmitted over a connection. */ -public interface Packet -{ +public interface Packet { + /** * Sets the channel id that should be used once the packet has been successfully decoded it is * sent to the correct channel. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java index d938b85..73ea529 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -72,8 +72,8 @@ import org.apache.activemq.artemis.utils.VersionLoader; * Implementations of this class need to be stateless. */ -public class ActiveMQClientProtocolManager implements ClientProtocolManager -{ +public class ActiveMQClientProtocolManager implements ClientProtocolManager { + private static final String handshake = "ARTEMIS"; private final int versionID = VersionLoader.getVersion().getIncrementingVersion(); @@ -106,117 +106,89 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager private final CountDownLatch waitLatch = new CountDownLatch(1); - - public ActiveMQClientProtocolManager() - { + public ActiveMQClientProtocolManager() { } - public String getName() - { + public String getName() { return ActiveMQClient.DEFAULT_CORE_PROTOCOL; } - public void setSessionFactory(ClientSessionFactory factory) - { - this.factoryInternal = (ClientSessionFactoryInternal)factory; + public void setSessionFactory(ClientSessionFactory factory) { + this.factoryInternal = (ClientSessionFactoryInternal) factory; } - public ClientSessionFactory getSessionFactory() - { + public ClientSessionFactory getSessionFactory() { return this.factoryInternal; } @Override - public void addChannelHandlers(ChannelPipeline pipeline) - { + public void addChannelHandlers(ChannelPipeline pipeline) { pipeline.addLast("activemq-decoder", new ActiveMQFrameDecoder2()); } - public boolean waitOnLatch(long milliseconds) throws InterruptedException - { + public boolean waitOnLatch(long milliseconds) throws InterruptedException { return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS); } - public Channel getChannel0() - { - if (connection == null) - { + public Channel getChannel0() { + if (connection == null) { return null; } - else - { + else { return connection.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1); } } - public RemotingConnection getCurrentConnection() - { + public RemotingConnection getCurrentConnection() { return connection; } - - public Channel getChannel1() - { - if (connection == null) - { + public Channel getChannel1() { + if (connection == null) { return null; } - else - { + else { return connection.getChannel(1, -1); } } - public Lock lockSessionCreation() - { - try - { + public Lock lockSessionCreation() { + try { Lock localFailoverLock = factoryInternal.lockFailover(); - try - { - if (connection == null) - { + try { + if (connection == null) { return null; } Lock lock = getChannel1().getLock(); // Lock it - this must be done while the failoverLock is held - while (isAlive() && !lock.tryLock(100, TimeUnit.MILLISECONDS)) - { + while (isAlive() && !lock.tryLock(100, TimeUnit.MILLISECONDS)) { } return lock; } - finally - { + finally { localFailoverLock.unlock(); } // We can now release the failoverLock } - catch (InterruptedException e) - { + catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } } - - public void stop() - { + public void stop() { alive = false; - - synchronized (inCreateSessionGuard) - { + synchronized (inCreateSessionGuard) { if (inCreateSessionLatch != null) inCreateSessionLatch.countDown(); } - Channel channel1 = getChannel1(); - if (channel1 != null) - { + if (channel1 != null) { channel1.returnBlocking(); } @@ -224,15 +196,12 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager } - public boolean isAlive() - { + public boolean isAlive() { return alive; } - @Override - public void ping(long connectionTTL) - { + public void ping(long connectionTTL) { Channel channel = connection.getChannel(ChannelImpl.CHANNEL_ID.PING.id, -1); Ping ping = new Ping(connectionTTL); @@ -243,37 +212,26 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager } @Override - public void sendSubscribeTopology(final boolean isServer) - { - getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, - VersionLoader.getVersion() - .getIncrementingVersion())); + public void sendSubscribeTopology(final boolean isServer) { + getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, VersionLoader.getVersion().getIncrementingVersion())); } @Override - public SessionContext createSessionContext(String name, String username, String password, - boolean xa, boolean autoCommitSends, boolean autoCommitAcks, - boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws ActiveMQException - { - for (Version clientVersion : VersionLoader.getClientVersions()) - { - try - { - return createSessionContext(clientVersion, - name, - username, - password, - xa, - autoCommitSends, - autoCommitAcks, - preAcknowledge, - minLargeMessageSize, - confirmationWindowSize); + public SessionContext createSessionContext(String name, + String username, + String password, + boolean xa, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + int minLargeMessageSize, + int confirmationWindowSize) throws ActiveMQException { + for (Version clientVersion : VersionLoader.getClientVersions()) { + try { + return createSessionContext(clientVersion, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, minLargeMessageSize, confirmationWindowSize); } - catch (ActiveMQException e) - { - if (e.getType() != ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS) - { + catch (ActiveMQException e) { + if (e.getType() != ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS) { throw e; } } @@ -282,10 +240,16 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager throw new ActiveMQException(ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS); } - public SessionContext createSessionContext(Version clientVersion, String name, String username, String password, - boolean xa, boolean autoCommitSends, boolean autoCommitAcks, - boolean preAcknowledge, int minLargeMessageSize, int confirmationWindowSize) throws ActiveMQException - { + public SessionContext createSessionContext(Version clientVersion, + String name, + String username, + String password, + boolean xa, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + int minLargeMessageSize, + int confirmationWindowSize) throws ActiveMQException { if (!isAlive()) throw ActiveMQClientMessageBundle.BUNDLE.clientSessionClosed(); @@ -293,20 +257,17 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager CreateSessionResponseMessage response = null; boolean retry; - do - { + do { retry = false; Lock lock = null; - try - { + try { lock = lockSessionCreation(); // We now set a flag saying createSession is executing - synchronized (inCreateSessionGuard) - { + synchronized (inCreateSessionGuard) { if (!isAlive()) throw ActiveMQClientMessageBundle.BUNDLE.clientSessionClosed(); inCreateSession = true; @@ -315,32 +276,17 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager long sessionChannelID = connection.generateChannelID(); - Packet request = new CreateSessionMessage(name, - sessionChannelID, - clientVersion.getIncrementingVersion(), - username, - password, - minLargeMessageSize, - xa, - autoCommitSends, - autoCommitAcks, - preAcknowledge, - confirmationWindowSize, - null); - - - try - { + Packet request = new CreateSessionMessage(name, sessionChannelID, clientVersion.getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindowSize, null); + + try { // channel1 reference here has to go away response = (CreateSessionResponseMessage) getChannel1().sendBlocking(request, PacketImpl.CREATESESSION_RESP); } - catch (ActiveMQException cause) - { + catch (ActiveMQException cause) { if (!isAlive()) throw cause; - if (cause.getType() == ActiveMQExceptionType.UNBLOCKED) - { + if (cause.getType() == ActiveMQExceptionType.UNBLOCKED) { // This means the thread was blocked on create session and failover unblocked it // so failover could occur @@ -348,37 +294,29 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager continue; } - else - { + else { throw cause; } } sessionChannel = connection.getChannel(sessionChannelID, confirmationWindowSize); - } - catch (Throwable t) - { - if (lock != null) - { + catch (Throwable t) { + if (lock != null) { lock.unlock(); lock = null; } - if (t instanceof ActiveMQException) - { + if (t instanceof ActiveMQException) { throw (ActiveMQException) t; } - else - { + else { throw ActiveMQClientMessageBundle.BUNDLE.failedToCreateSession(t); } } - finally - { - if (lock != null) - { + finally { + if (lock != null) { lock.unlock(); } @@ -386,60 +324,48 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager inCreateSession = false; inCreateSessionLatch.countDown(); } - } - while (retry); - + } while (retry); // these objects won't be null, otherwise it would keep retrying on the previous loop return new ActiveMQSessionContext(name, connection, sessionChannel, response.getServerVersion(), confirmationWindowSize); } - public boolean cleanupBeforeFailover(ActiveMQException cause) - { + public boolean cleanupBeforeFailover(ActiveMQException cause) { boolean needToInterrupt; CountDownLatch exitLockLatch; Lock lock = lockSessionCreation(); - if (lock == null) - { + if (lock == null) { return false; } - try - { - synchronized (inCreateSessionGuard) - { + try { + synchronized (inCreateSessionGuard) { needToInterrupt = inCreateSession; exitLockLatch = inCreateSessionLatch; } } - finally - { + finally { lock.unlock(); } - if (needToInterrupt) - { + if (needToInterrupt) { forceReturnChannel1(cause); // Now we need to make sure that the thread has actually exited and returned it's // connections // before failover occurs - while (inCreateSession && isAlive()) - { - try - { - if (exitLockLatch != null) - { + while (inCreateSession && isAlive()) { + try { + if (exitLockLatch != null) { exitLockLatch.await(500, TimeUnit.MILLISECONDS); } } - catch (InterruptedException e1) - { + catch (InterruptedException e1) { throw new ActiveMQInterruptedException(e1); } } @@ -449,37 +375,31 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager } @Override - public boolean checkForFailover(String liveNodeID) throws ActiveMQException - { + public boolean checkForFailover(String liveNodeID) throws ActiveMQException { CheckFailoverMessage packet = new CheckFailoverMessage(liveNodeID); - CheckFailoverReplyMessage message = (CheckFailoverReplyMessage) getChannel1().sendBlocking(packet, - PacketImpl.CHECK_FOR_FAILOVER_REPLY); + CheckFailoverReplyMessage message = (CheckFailoverReplyMessage) getChannel1().sendBlocking(packet, PacketImpl.CHECK_FOR_FAILOVER_REPLY); return message.isOkToFailover(); } - - public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, - List incomingInterceptors, List outgoingInterceptors, - TopologyResponseHandler topologyResponseHandler) - { - this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection, - callTimeout, callFailoverTimeout, - incomingInterceptors, outgoingInterceptors); + public RemotingConnection connect(Connection transportConnection, + long callTimeout, + long callFailoverTimeout, + List incomingInterceptors, + List outgoingInterceptors, + TopologyResponseHandler topologyResponseHandler) { + this.connection = new RemotingConnectionImpl(getPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors); this.topologyResponseHandler = topologyResponseHandler; getChannel0().setHandler(new Channel0Handler(connection)); - sendHandshake(transportConnection); return connection; } - private void sendHandshake(Connection transportConnection) - { - if (transportConnection.isUsingProtocolHandling()) - { + private void sendHandshake(Connection transportConnection) { + if (transportConnection.isUsingProtocolHandling()) { // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length()); amqbuffer.writeBytes(handshake.getBytes()); @@ -487,29 +407,24 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager } } + private class Channel0Handler implements ChannelHandler { - private class Channel0Handler implements ChannelHandler - { private final CoreRemotingConnection conn; - private Channel0Handler(final CoreRemotingConnection conn) - { + private Channel0Handler(final CoreRemotingConnection conn) { this.conn = conn; } - public void handlePacket(final Packet packet) - { + public void handlePacket(final Packet packet) { final byte type = packet.getType(); - if (type == PacketImpl.DISCONNECT || type == PacketImpl.DISCONNECT_V2) - { + if (type == PacketImpl.DISCONNECT || type == PacketImpl.DISCONNECT_V2) { final DisconnectMessage msg = (DisconnectMessage) packet; String scaleDownTargetNodeID = null; SimpleString nodeID = msg.getNodeID(); - if (packet instanceof DisconnectMessage_V2) - { + if (packet instanceof DisconnectMessage_V2) { final DisconnectMessage_V2 msg_v2 = (DisconnectMessage_V2) packet; scaleDownTargetNodeID = msg_v2.getScaleDownNodeID() == null ? null : msg_v2.getScaleDownNodeID().toString(); } @@ -517,23 +432,19 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager if (topologyResponseHandler != null) topologyResponseHandler.nodeDisconnected(conn, nodeID == null ? null : nodeID.toString(), scaleDownTargetNodeID); } - else if (type == PacketImpl.CLUSTER_TOPOLOGY) - { + else if (type == PacketImpl.CLUSTER_TOPOLOGY) { ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; notifyTopologyChange(topMessage); } - else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2) - { + else if (type == PacketImpl.CLUSTER_TOPOLOGY_V2) { ClusterTopologyChangeMessage_V2 topMessage = (ClusterTopologyChangeMessage_V2) packet; notifyTopologyChange(topMessage); } - else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3) - { + else if (type == PacketImpl.CLUSTER_TOPOLOGY || type == PacketImpl.CLUSTER_TOPOLOGY_V2 || type == PacketImpl.CLUSTER_TOPOLOGY_V3) { ClusterTopologyChangeMessage topMessage = (ClusterTopologyChangeMessage) packet; notifyTopologyChange(topMessage); } - else if (type == PacketImpl.CHECK_FOR_FAILOVER_REPLY) - { + else if (type == PacketImpl.CHECK_FOR_FAILOVER_REPLY) { System.out.println("Channel0Handler.handlePacket"); } } @@ -541,73 +452,57 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager /** * @param topMessage */ - private void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage) - { + private void notifyTopologyChange(final ClusterTopologyChangeMessage topMessage) { final long eventUID; final String backupGroupName; final String scaleDownGroupName; - if (topMessage instanceof ClusterTopologyChangeMessage_V3) - { + if (topMessage instanceof ClusterTopologyChangeMessage_V3) { eventUID = ((ClusterTopologyChangeMessage_V3) topMessage).getUniqueEventID(); backupGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getBackupGroupName(); scaleDownGroupName = ((ClusterTopologyChangeMessage_V3) topMessage).getScaleDownGroupName(); } - else if (topMessage instanceof ClusterTopologyChangeMessage_V2) - { + else if (topMessage instanceof ClusterTopologyChangeMessage_V2) { eventUID = ((ClusterTopologyChangeMessage_V2) topMessage).getUniqueEventID(); backupGroupName = ((ClusterTopologyChangeMessage_V2) topMessage).getBackupGroupName(); scaleDownGroupName = null; } - else - { + else { eventUID = System.currentTimeMillis(); backupGroupName = null; scaleDownGroupName = null; } - if (topMessage.isExit()) - { - if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) - { + if (topMessage.isExit()) { + if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) { ActiveMQClientLogger.LOGGER.debug("Notifying " + topMessage.getNodeID() + " going down"); } - if (topologyResponseHandler != null) - { + if (topologyResponseHandler != null) { topologyResponseHandler.notifyNodeDown(eventUID, topMessage.getNodeID()); } } - else - { + else { Pair transportConfig = topMessage.getPair(); - if (transportConfig.getA() == null && transportConfig.getB() == null) - { - transportConfig = new Pair<>(conn.getTransportConnection() - .getConnectorConfig(), - null); + if (transportConfig.getA() == null && transportConfig.getB() == null) { + transportConfig = new Pair<>(conn.getTransportConnection().getConnectorConfig(), null); } - if (topologyResponseHandler != null) - { + if (topologyResponseHandler != null) { topologyResponseHandler.notifyNodeUp(eventUID, topMessage.getNodeID(), backupGroupName, scaleDownGroupName, transportConfig, topMessage.isLast()); } } } } - protected PacketDecoder getPacketDecoder() - { + protected PacketDecoder getPacketDecoder() { return ClientPacketDecoder.INSTANCE; } - private void forceReturnChannel1(ActiveMQException cause) - { - if (connection != null) - { + private void forceReturnChannel1(ActiveMQException cause) { + if (connection != null) { Channel channel1 = connection.getChannel(1, -1); - if (channel1 != null) - { + if (channel1 != null) { channel1.returnBlocking(cause); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java index 7c3435a..a58834b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManagerFactory.java @@ -19,23 +19,20 @@ package org.apache.activemq.artemis.core.protocol.core.impl; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManagerFactory; -public class ActiveMQClientProtocolManagerFactory implements ClientProtocolManagerFactory -{ +public class ActiveMQClientProtocolManagerFactory implements ClientProtocolManagerFactory { + private static final long serialVersionUID = 1; private static final ActiveMQClientProtocolManagerFactory INSTANCE = new ActiveMQClientProtocolManagerFactory(); - private ActiveMQClientProtocolManagerFactory() - { + private ActiveMQClientProtocolManagerFactory() { } - public static final ActiveMQClientProtocolManagerFactory getInstance() - { + public static final ActiveMQClientProtocolManagerFactory getInstance() { return INSTANCE; } - public ClientProtocolManager newProtocolManager() - { + public ClientProtocolManager newProtocolManager() { return new ActiveMQClientProtocolManager(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java index 320930d..08abb91 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java @@ -18,36 +18,35 @@ package org.apache.activemq.artemis.core.protocol.core.impl; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; -public class ActiveMQConsumerContext extends ConsumerContext -{ +public class ActiveMQConsumerContext extends ConsumerContext { + private long id; - public ActiveMQConsumerContext(long id) - { + public ActiveMQConsumerContext(long id) { this.id = id; } - public long getId() - { + public long getId() { return id; } @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; ActiveMQConsumerContext that = (ActiveMQConsumerContext) o; - if (id != that.id) return false; + if (id != that.id) + return false; return true; } @Override - public int hashCode() - { + public int hashCode() { return (int) (id ^ (id >>> 32)); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 23370d5..5279de2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -107,16 +107,18 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG; -public class ActiveMQSessionContext extends SessionContext -{ +public class ActiveMQSessionContext extends SessionContext { + private final Channel sessionChannel; private final int serverVersion; private int confirmationWindow; private final String name; - - public ActiveMQSessionContext(String name, RemotingConnection remotingConnection, Channel sessionChannel, int serverVersion, int confirmationWindow) - { + public ActiveMQSessionContext(String name, + RemotingConnection remotingConnection, + Channel sessionChannel, + int serverVersion, + int confirmationWindow) { super(remotingConnection); this.name = name; @@ -127,71 +129,55 @@ public class ActiveMQSessionContext extends SessionContext ChannelHandler handler = new ClientSessionPacketHandler(); sessionChannel.setHandler(handler); - - if (confirmationWindow >= 0) - { + if (confirmationWindow >= 0) { sessionChannel.setCommandConfirmationHandler(confirmationHandler); } } - - private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() - { - public void commandConfirmed(final Packet packet) - { - if (packet.getType() == PacketImpl.SESS_SEND) - { + private final CommandConfirmationHandler confirmationHandler = new CommandConfirmationHandler() { + public void commandConfirmed(final Packet packet) { + if (packet.getType() == PacketImpl.SESS_SEND) { SessionSendMessage ssm = (SessionSendMessage) packet; callSendAck(ssm.getHandler(), ssm.getMessage()); } - else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) - { + else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) { SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet; - if (!scm.isContinues()) - { + if (!scm.isContinues()) { callSendAck(scm.getHandler(), scm.getMessage()); } } } - private void callSendAck(SendAcknowledgementHandler handler, final Message message) - { - if (handler != null) - { + private void callSendAck(SendAcknowledgementHandler handler, final Message message) { + if (handler != null) { handler.sendAcknowledged(message); } - else if (sendAckHandler != null) - { + else if (sendAckHandler != null) { sendAckHandler.sendAcknowledged(message); } } }; - // Failover utility methods @Override - public void returnBlocking(ActiveMQException cause) - { + public void returnBlocking(ActiveMQException cause) { sessionChannel.returnBlocking(cause); } @Override - public void lockCommunications() - { + public void lockCommunications() { sessionChannel.lock(); } @Override - public void releaseCommunications() - { + public void releaseCommunications() { sessionChannel.setTransferring(false); sessionChannel.unlock(); } - public void cleanup() - { + public void cleanup() { sessionChannel.close(); // if the server is sending a disconnect @@ -200,14 +186,11 @@ public class ActiveMQSessionContext extends SessionContext } @Override - public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits) - { + public void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits) { // nothing to be done here... Flow control here is done on the core side } - - public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) - { + public void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler) { sessionChannel.setCommandConfirmationHandler(confirmationHandler); this.sendAckHandler = handler; } @@ -215,37 +198,34 @@ public class ActiveMQSessionContext extends SessionContext public void createSharedQueue(SimpleString address, SimpleString queueName, SimpleString filterString, - boolean durable) throws ActiveMQException - { + boolean durable) throws ActiveMQException { sessionChannel.sendBlocking(new CreateSharedQueueMessage(address, queueName, filterString, durable, true), PacketImpl.NULL_RESPONSE); } - public void deleteQueue(final SimpleString queueName) throws ActiveMQException - { + public void deleteQueue(final SimpleString queueName) throws ActiveMQException { sessionChannel.sendBlocking(new SessionDeleteQueueMessage(queueName), PacketImpl.NULL_RESPONSE); } - public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException - { + public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException { SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); SessionQueueQueryResponseMessage_V2 response = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2); return response.toQueueQuery(); } - public ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString, - int windowSize, int maxRate, int ackBatchSize, boolean browseOnly, - Executor executor, Executor flowControlExecutor) throws ActiveMQException - { + public ClientConsumerInternal createConsumer(SimpleString queueName, + SimpleString filterString, + int windowSize, + int maxRate, + int ackBatchSize, + boolean browseOnly, + Executor executor, + Executor flowControlExecutor) throws ActiveMQException { long consumerID = idGenerator.generateID(); ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID); - SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, - queueName, - filterString, - browseOnly, - true); + SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true); SessionQueueQueryResponseMessage_V2 queueInfo = (SessionQueueQueryResponseMessage_V2) sessionChannel.sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP_V2); @@ -253,133 +233,93 @@ public class ActiveMQSessionContext extends SessionContext // could be overridden on the queue settings // The value we send is just a hint - return new ClientConsumerImpl(session, - consumerContext, - queueName, - filterString, - browseOnly, - calcWindowSize(windowSize), - ackBatchSize, - maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, - false) - : null, - executor, - flowControlExecutor, - this, - queueInfo.toQueueQuery(), - lookupTCCL()); - } - - - public int getServerVersion() - { + return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); + } + + public int getServerVersion() { return serverVersion; } - public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException - { - SessionBindingQueryResponseMessage_V2 response = - (SessionBindingQueryResponseMessage_V2) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2); + public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { + SessionBindingQueryResponseMessage_V2 response = (SessionBindingQueryResponseMessage_V2) sessionChannel.sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP_V2); return new AddressQueryImpl(response.isExists(), response.getQueueNames(), response.isAutoCreateJmsQueues()); } - @Override - public void closeConsumer(final ClientConsumer consumer) throws ActiveMQException - { + public void closeConsumer(final ClientConsumer consumer) throws ActiveMQException { sessionChannel.sendBlocking(new SessionConsumerCloseMessage(getConsumerID(consumer)), PacketImpl.NULL_RESPONSE); } - public void sendConsumerCredits(final ClientConsumer consumer, final int credits) - { + public void sendConsumerCredits(final ClientConsumer consumer, final int credits) { sessionChannel.send(new SessionConsumerFlowCreditMessage(getConsumerID(consumer), credits)); } - public void forceDelivery(final ClientConsumer consumer, final long sequence) throws ActiveMQException - { + public void forceDelivery(final ClientConsumer consumer, final long sequence) throws ActiveMQException { SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(getConsumerID(consumer), sequence); sessionChannel.send(request); } - public void simpleCommit() throws ActiveMQException - { + public void simpleCommit() throws ActiveMQException { sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE); } - public void simpleRollback(boolean lastMessageAsDelivered) throws ActiveMQException - { + public void simpleRollback(boolean lastMessageAsDelivered) throws ActiveMQException { sessionChannel.sendBlocking(new RollbackMessage(lastMessageAsDelivered), PacketImpl.NULL_RESPONSE); } - public void sessionStart() throws ActiveMQException - { + public void sessionStart() throws ActiveMQException { sessionChannel.send(new PacketImpl(PacketImpl.SESS_START)); } - public void sessionStop() throws ActiveMQException - { + public void sessionStop() throws ActiveMQException { sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP), PacketImpl.NULL_RESPONSE); } - public void addSessionMetadata(String key, String data) throws ActiveMQException - { + public void addSessionMetadata(String key, String data) throws ActiveMQException { sessionChannel.sendBlocking(new SessionAddMetaDataMessageV2(key, data), PacketImpl.NULL_RESPONSE); } - - public void addUniqueMetaData(String key, String data) throws ActiveMQException - { + public void addUniqueMetaData(String key, String data) throws ActiveMQException { sessionChannel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data), PacketImpl.NULL_RESPONSE); } - public void xaCommit(Xid xid, boolean onePhase) throws XAException, ActiveMQException - { + public void xaCommit(Xid xid, boolean onePhase) throws XAException, ActiveMQException { SessionXACommitMessage packet = new SessionXACommitMessage(xid, onePhase); SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); - if (response.isError()) - { + if (response.isError()) { throw new XAException(response.getResponseCode()); } - if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) - { + if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) { ActiveMQClientLogger.LOGGER.trace("finished commit on " + ClientSessionImpl.convert(xid) + " with response = " + response); } } - public void xaEnd(Xid xid, int flags) throws XAException, ActiveMQException - { + public void xaEnd(Xid xid, int flags) throws XAException, ActiveMQException { Packet packet; - if (flags == XAResource.TMSUSPEND) - { + if (flags == XAResource.TMSUSPEND) { packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND); } - else if (flags == XAResource.TMSUCCESS) - { + else if (flags == XAResource.TMSUCCESS) { packet = new SessionXAEndMessage(xid, false); } - else if (flags == XAResource.TMFAIL) - { + else if (flags == XAResource.TMFAIL) { packet = new SessionXAEndMessage(xid, true); } - else - { + else { throw new XAException(XAException.XAER_INVAL); } SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); - if (response.isError()) - { + if (response.isError()) { throw new XAException(response.getResponseCode()); } } - - public void sendProducerCreditsMessage(final int credits, final SimpleString address) - { + public void sendProducerCreditsMessage(final int credits, final SimpleString address) { sessionChannel.send(new SessionRequestProducerCreditsMessage(credits, address)); } @@ -388,34 +328,31 @@ public class ActiveMQSessionContext extends SessionContext * * @return */ - public boolean supportsLargeMessage() - { + public boolean supportsLargeMessage() { return true; } @Override - public int getCreditsOnSendingFull(MessageInternal msgI) - { + public int getCreditsOnSendingFull(MessageInternal msgI) { return msgI.getEncodeSize(); } - public void sendFullMessage(MessageInternal msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws ActiveMQException - { + public void sendFullMessage(MessageInternal msgI, + boolean sendBlocking, + SendAcknowledgementHandler handler, + SimpleString defaultAddress) throws ActiveMQException { SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking, handler); - if (sendBlocking) - { + if (sendBlocking) { sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE); } - else - { + else { sessionChannel.sendBatched(packet); } } @Override - public int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws ActiveMQException - { + public int sendInitialChunkOnLargeMessage(MessageInternal msgI) throws ActiveMQException { SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(msgI); sessionChannel.send(initialChunk); @@ -424,89 +361,78 @@ public class ActiveMQSessionContext extends SessionContext } @Override - public int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException - { + public int sendLargeMessageChunk(MessageInternal msgI, + long messageBodySize, + boolean sendBlocking, + boolean lastChunk, + byte[] chunk, + SendAcknowledgementHandler messageHandler) throws ActiveMQException { final boolean requiresResponse = lastChunk && sendBlocking; - final SessionSendContinuationMessage chunkPacket = - new SessionSendContinuationMessage(msgI, chunk, !lastChunk, - requiresResponse, messageBodySize, messageHandler); + final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); - if (requiresResponse) - { + if (requiresResponse) { // When sending it blocking, only the last chunk will be blocking. sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); } - else - { + else { sessionChannel.send(chunkPacket); } return chunkPacket.getPacketSize(); } - public void sendACK(boolean individual, boolean block, final ClientConsumer consumer, final Message message) throws ActiveMQException - { + public void sendACK(boolean individual, + boolean block, + final ClientConsumer consumer, + final Message message) throws ActiveMQException { PacketImpl messagePacket; - if (individual) - { + if (individual) { messagePacket = new SessionIndividualAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block); } - else - { + else { messagePacket = new SessionAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block); } - if (block) - { + if (block) { sessionChannel.sendBlocking(messagePacket, PacketImpl.NULL_RESPONSE); } - else - { + else { sessionChannel.sendBatched(messagePacket); } } - public void expireMessage(final ClientConsumer consumer, Message message) throws ActiveMQException - { + public void expireMessage(final ClientConsumer consumer, Message message) throws ActiveMQException { SessionExpireMessage messagePacket = new SessionExpireMessage(getConsumerID(consumer), message.getMessageID()); sessionChannel.send(messagePacket); } - - public void sessionClose() throws ActiveMQException - { + public void sessionClose() throws ActiveMQException { sessionChannel.sendBlocking(new SessionCloseMessage(), PacketImpl.NULL_RESPONSE); } - public void xaForget(Xid xid) throws XAException, ActiveMQException - { + public void xaForget(Xid xid) throws XAException, ActiveMQException { SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(new SessionXAForgetMessage(xid), PacketImpl.SESS_XA_RESP); - if (response.isError()) - { + if (response.isError()) { throw new XAException(response.getResponseCode()); } } - public int xaPrepare(Xid xid) throws XAException, ActiveMQException - { + public int xaPrepare(Xid xid) throws XAException, ActiveMQException { SessionXAPrepareMessage packet = new SessionXAPrepareMessage(xid); SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); - if (response.isError()) - { + if (response.isError()) { throw new XAException(response.getResponseCode()); } - else - { + else { return response.getResponseCode(); } } - public Xid[] xaScan() throws ActiveMQException - { + public Xid[] xaScan() throws ActiveMQException { SessionXAGetInDoubtXidsResponseMessage response = (SessionXAGetInDoubtXidsResponseMessage) sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS), PacketImpl.SESS_XA_INDOUBT_XIDS_RESP); List xids = response.getXids(); @@ -516,71 +442,63 @@ public class ActiveMQSessionContext extends SessionContext return xidArray; } - public void xaRollback(Xid xid, boolean wasStarted) throws ActiveMQException, XAException - { + public void xaRollback(Xid xid, boolean wasStarted) throws ActiveMQException, XAException { SessionXARollbackMessage packet = new SessionXARollbackMessage(xid); SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); - if (response.isError()) - { + if (response.isError()) { throw new XAException(response.getResponseCode()); } } - public void xaStart(Xid xid, int flags) throws XAException, ActiveMQException - { + public void xaStart(Xid xid, int flags) throws XAException, ActiveMQException { Packet packet; - if (flags == XAResource.TMJOIN) - { + if (flags == XAResource.TMJOIN) { packet = new SessionXAJoinMessage(xid); } - else if (flags == XAResource.TMRESUME) - { + else if (flags == XAResource.TMRESUME) { packet = new SessionXAResumeMessage(xid); } - else if (flags == XAResource.TMNOFLAGS) - { + else if (flags == XAResource.TMNOFLAGS) { // Don't need to flush since the previous end will have done this packet = new SessionXAStartMessage(xid); } - else - { + else { throw new XAException(XAException.XAER_INVAL); } SessionXAResponseMessage response = (SessionXAResponseMessage) sessionChannel.sendBlocking(packet, PacketImpl.SESS_XA_RESP); - if (response.isError()) - { + if (response.isError()) { ActiveMQClientLogger.LOGGER.errorCallingStart(response.getMessage(), response.getResponseCode()); throw new XAException(response.getResponseCode()); } } - public boolean configureTransactionTimeout(int seconds) throws ActiveMQException - { + public boolean configureTransactionTimeout(int seconds) throws ActiveMQException { SessionXASetTimeoutResponseMessage response = (SessionXASetTimeoutResponseMessage) sessionChannel.sendBlocking(new SessionXASetTimeoutMessage(seconds), PacketImpl.SESS_XA_SET_TIMEOUT_RESP); return response.isOK(); } - public int recoverSessionTimeout() throws ActiveMQException - { + public int recoverSessionTimeout() throws ActiveMQException { SessionXAGetTimeoutResponseMessage response = (SessionXAGetTimeoutResponseMessage) sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT), PacketImpl.SESS_XA_GET_TIMEOUT_RESP); return response.getTimeoutSeconds(); } - public void createQueue(SimpleString address, SimpleString queueName, SimpleString filterString, boolean durable, boolean temp) throws ActiveMQException - { + public void createQueue(SimpleString address, + SimpleString queueName, + SimpleString filterString, + boolean durable, + boolean temp) throws ActiveMQException { CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp, true); sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE); } @Override - public boolean reattachOnNewConnection(RemotingConnection newConnection) throws ActiveMQException - { + public boolean reattachOnNewConnection(RemotingConnection newConnection) throws ActiveMQException { this.remotingConnection = newConnection; @@ -592,8 +510,7 @@ public class ActiveMQSessionContext extends SessionContext ReattachSessionResponseMessage response = (ReattachSessionResponseMessage) channel1.sendBlocking(request, PacketImpl.REATTACH_SESSION_RESP); - if (response.isReattached()) - { + if (response.isReattached()) { ActiveMQClientLogger.LOGGER.replayingCommands(sessionChannel.getID(), response.getLastConfirmedCommandID()); // The session was found on the server - we reattached transparently ok @@ -601,8 +518,7 @@ public class ActiveMQSessionContext extends SessionContext return true; } - else - { + else { ActiveMQClientLogger.LOGGER.reconnectCreatingNewSession(sessionChannel.getID()); sessionChannel.clearCommands(); @@ -619,148 +535,102 @@ public class ActiveMQSessionContext extends SessionContext final boolean autoCommitSends, final boolean autoCommitAcks, final boolean preAcknowledge, - final SimpleString defaultAddress) throws ActiveMQException - { - Packet createRequest = new CreateSessionMessage(name, - sessionChannel.getID(), - VersionLoader.getVersion().getIncrementingVersion(), - username, - password, - minLargeMessageSize, - xa, - autoCommitSends, - autoCommitAcks, - preAcknowledge, - confirmationWindow, - defaultAddress == null ? null - : defaultAddress.toString()); + final SimpleString defaultAddress) throws ActiveMQException { + Packet createRequest = new CreateSessionMessage(name, sessionChannel.getID(), VersionLoader.getVersion().getIncrementingVersion(), username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, confirmationWindow, defaultAddress == null ? null : defaultAddress.toString()); boolean retry; - do - { - try - { + do { + try { getCreateChannel().sendBlocking(createRequest, PacketImpl.CREATESESSION_RESP); retry = false; } - catch (ActiveMQException e) - { + catch (ActiveMQException e) { // the session was created while its server was starting, retry it: - if (e.getType() == ActiveMQExceptionType.SESSION_CREATION_REJECTED) - { + if (e.getType() == ActiveMQExceptionType.SESSION_CREATION_REJECTED) { ActiveMQClientLogger.LOGGER.retryCreateSessionSeverStarting(name); retry = true; // sleep a little bit to avoid spinning too much - try - { + try { Thread.sleep(10); } - catch (InterruptedException ie) - { + catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw e; } } - else - { + else { throw e; } } - } - while (retry && !session.isClosing()); + } while (retry && !session.isClosing()); } @Override - public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException - { + public void recreateConsumerOnServer(ClientConsumerInternal consumerInternal) throws ActiveMQException { ClientSession.QueueQuery queueInfo = consumerInternal.getQueueInfo(); // We try and recreate any non durable queues, since they probably won't be there unless // they are defined in broker.xml // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover - if (!queueInfo.isDurable()) - { - CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(), - queueInfo.getName(), - queueInfo.getFilterString(), - false, - queueInfo.isTemporary(), - false); + if (!queueInfo.isDurable()) { + CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getFilterString(), false, queueInfo.isTemporary(), false); sendPacketWithoutLock(sessionChannel, createQueueRequest); } - SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), - consumerInternal.getQueueName(), - consumerInternal.getFilterString(), - consumerInternal.isBrowseOnly(), - false); + SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), consumerInternal.getQueueName(), consumerInternal.getFilterString(), consumerInternal.isBrowseOnly(), false); sendPacketWithoutLock(sessionChannel, createConsumerRequest); int clientWindowSize = consumerInternal.getClientWindowSize(); - if (clientWindowSize != 0) - { - SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), - clientWindowSize); + if (clientWindowSize != 0) { + SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), clientWindowSize); sendPacketWithoutLock(sessionChannel, packet); } - else - { + else { // https://jira.jboss.org/browse/HORNETQ-522 - SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), - 1); + SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(getConsumerID(consumerInternal), 1); sendPacketWithoutLock(sessionChannel, packet); } } - public void xaFailed(Xid xid) throws ActiveMQException - { + public void xaFailed(Xid xid) throws ActiveMQException { sendPacketWithoutLock(sessionChannel, new SessionXAAfterFailedMessage(xid)); } - public void restartSession() throws ActiveMQException - { + public void restartSession() throws ActiveMQException { sendPacketWithoutLock(sessionChannel, new PacketImpl(PacketImpl.SESS_START)); } @Override - public void resetMetadata(HashMap metaDataToSend) - { + public void resetMetadata(HashMap metaDataToSend) { // Resetting the metadata after failover - for (Map.Entry entries : metaDataToSend.entrySet()) - { + for (Map.Entry entries : metaDataToSend.entrySet()) { sendPacketWithoutLock(sessionChannel, new SessionAddMetaDataMessageV2(entries.getKey(), entries.getValue(), false)); } } - - private Channel getCreateChannel() - { + private Channel getCreateChannel() { return getCoreConnection().getChannel(1, -1); } - private CoreRemotingConnection getCoreConnection() - { + private CoreRemotingConnection getCoreConnection() { return (CoreRemotingConnection) remotingConnection; } - /** * This doesn't apply to other protocols probably, so it will be an ActiveMQ Artemis exclusive feature * * @throws ActiveMQException */ - private void handleConsumerDisconnected(DisconnectConsumerMessage packet) throws ActiveMQException - { + private void handleConsumerDisconnected(DisconnectConsumerMessage packet) throws ActiveMQException { DisconnectConsumerMessage message = packet; session.handleConsumerDisconnect(new ActiveMQConsumerContext(message.getConsumerId())); } - private void handleReceivedMessagePacket(SessionReceiveMessage messagePacket) throws Exception - { + private void handleReceivedMessagePacket(SessionReceiveMessage messagePacket) throws Exception { ClientMessageInternal msgi = (ClientMessageInternal) messagePacket.getMessage(); msgi.setDeliveryCount(messagePacket.getDeliveryCount()); @@ -770,8 +640,7 @@ public class ActiveMQSessionContext extends SessionContext handleReceiveMessage(new ActiveMQConsumerContext(messagePacket.getConsumerID()), msgi); } - private void handleReceiveLargeMessage(SessionReceiveLargeMessage serverPacket) throws Exception - { + private void handleReceiveLargeMessage(SessionReceiveLargeMessage serverPacket) throws Exception { ClientLargeMessageInternal clientLargeMessage = (ClientLargeMessageInternal) serverPacket.getLargeMessage(); clientLargeMessage.setFlowControlSize(serverPacket.getPacketSize()); @@ -781,73 +650,55 @@ public class ActiveMQSessionContext extends SessionContext handleReceiveLargeMessage(new ActiveMQConsumerContext(serverPacket.getConsumerID()), clientLargeMessage, serverPacket.getLargeMessageSize()); } - - private void handleReceiveContinuation(SessionReceiveContinuationMessage continuationPacket) throws Exception - { - handleReceiveContinuation(new ActiveMQConsumerContext(continuationPacket.getConsumerID()), continuationPacket.getBody(), continuationPacket.getPacketSize(), - continuationPacket.isContinues()); + private void handleReceiveContinuation(SessionReceiveContinuationMessage continuationPacket) throws Exception { + handleReceiveContinuation(new ActiveMQConsumerContext(continuationPacket.getConsumerID()), continuationPacket.getBody(), continuationPacket.getPacketSize(), continuationPacket.isContinues()); } - - protected void handleReceiveProducerCredits(SessionProducerCreditsMessage message) - { + protected void handleReceiveProducerCredits(SessionProducerCreditsMessage message) { handleReceiveProducerCredits(message.getAddress(), message.getCredits()); } - - protected void handleReceiveProducerFailCredits(SessionProducerCreditsFailMessage message) - { + protected void handleReceiveProducerFailCredits(SessionProducerCreditsFailMessage message) { handleReceiveProducerFailCredits(message.getAddress(), message.getCredits()); } - class ClientSessionPacketHandler implements ChannelHandler - { + class ClientSessionPacketHandler implements ChannelHandler { - public void handlePacket(final Packet packet) - { + public void handlePacket(final Packet packet) { byte type = packet.getType(); - try - { - switch (type) - { - case DISCONNECT_CONSUMER: - { + try { + switch (type) { + case DISCONNECT_CONSUMER: { handleConsumerDisconnected((DisconnectConsumerMessage) packet); break; } - case SESS_RECEIVE_CONTINUATION: - { + case SESS_RECEIVE_CONTINUATION: { handleReceiveContinuation((SessionReceiveContinuationMessage) packet); break; } - case SESS_RECEIVE_MSG: - { + case SESS_RECEIVE_MSG: { handleReceivedMessagePacket((SessionReceiveMessage) packet); break; } - case SESS_RECEIVE_LARGE_MSG: - { + case SESS_RECEIVE_LARGE_MSG: { handleReceiveLargeMessage((SessionReceiveLargeMessage) packet); break; } - case PacketImpl.SESS_PRODUCER_CREDITS: - { + case PacketImpl.SESS_PRODUCER_CREDITS: { handleReceiveProducerCredits((SessionProducerCreditsMessage) packet); break; } - case PacketImpl.SESS_PRODUCER_FAIL_CREDITS: - { + case PacketImpl.SESS_PRODUCER_FAIL_CREDITS: { handleReceiveProducerFailCredits((SessionProducerCreditsFailMessage) packet); break; } - case EXCEPTION: - { + case EXCEPTION: { // We can only log these exceptions // maybe we should cache it on SessionContext and throw an exception on any next calls ActiveMQExceptionMessage mem = (ActiveMQExceptionMessage) packet; @@ -856,14 +707,12 @@ public class ActiveMQSessionContext extends SessionContext break; } - default: - { + default: { throw new IllegalStateException("Invalid packet: " + type); } } } - catch (Exception e) - { + catch (Exception e) { ActiveMQClientLogger.LOGGER.failedToHandlePacket(e); } @@ -871,58 +720,46 @@ public class ActiveMQSessionContext extends SessionContext } } - private long getConsumerID(ClientConsumer consumer) - { - return ((ActiveMQConsumerContext)consumer.getConsumerContext()).getId(); + private long getConsumerID(ClientConsumer consumer) { + return ((ActiveMQConsumerContext) consumer.getConsumerContext()).getId(); } - private ClassLoader lookupTCCL() - { - return AccessController.doPrivileged(new PrivilegedAction() - { - public ClassLoader run() - { + private ClassLoader lookupTCCL() { + return AccessController.doPrivileged(new PrivilegedAction() { + public ClassLoader run() { return Thread.currentThread().getContextClassLoader(); } }); } - private int calcWindowSize(final int windowSize) - { + private int calcWindowSize(final int windowSize) { int clientWindowSize; - if (windowSize == -1) - { + if (windowSize == -1) { // No flow control - buffer can increase without bound! Only use with // caution for very fast consumers clientWindowSize = -1; } - else if (windowSize == 0) - { + else if (windowSize == 0) { // Slow consumer - no buffering clientWindowSize = 0; } - else if (windowSize == 1) - { + else if (windowSize == 1) { // Slow consumer = buffer 1 clientWindowSize = 1; } - else if (windowSize > 1) - { + else if (windowSize > 1) { // Client window size is half server window size clientWindowSize = windowSize >> 1; } - else - { + else { throw ActiveMQClientMessageBundle.BUNDLE.invalidWindowSize(windowSize); } return clientWindowSize; } - - private void sendPacketWithoutLock(final Channel parameterChannel, final Packet packet) - { + private void sendPacketWithoutLock(final Channel parameterChannel, final Packet packet) { packet.setChannelID(parameterChannel.getID()); Connection conn = parameterChannel.getConnection().getTransportConnection(); @@ -932,5 +769,4 @@ public class ActiveMQSessionContext extends SessionContext conn.write(buffer, false, false); } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bac96047/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/BackwardsCompatibilityUtils.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/BackwardsCompatibilityUtils.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/BackwardsCompatibilityUtils.java index 2b32f1f..9419796 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/BackwardsCompatibilityUtils.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/BackwardsCompatibilityUtils.java @@ -23,25 +23,21 @@ import org.apache.activemq.artemis.api.core.client.TopologyMember; /** * This is a utility class to house any HornetQ client specific backwards compatibility methods. - * */ -public class BackwardsCompatibilityUtils -{ +public class BackwardsCompatibilityUtils { + private static int INITIAL_ACTIVEMQ_INCREMENTING_VERSION = 126; - public static Pair getTCPair(int clientIncrementingVersion, TopologyMember member) - { - if (clientIncrementingVersion < INITIAL_ACTIVEMQ_INCREMENTING_VERSION) - { + public static Pair getTCPair(int clientIncrementingVersion, + TopologyMember member) { + if (clientIncrementingVersion < INITIAL_ACTIVEMQ_INCREMENTING_VERSION) { return new Pair<>(replaceClassName(member.getLive()), replaceClassName(member.getBackup())); } return new Pair<>(member.getLive(), member.getBackup()); } - private static TransportConfiguration replaceClassName(TransportConfiguration tc) - { - if (tc != null) - { + private static TransportConfiguration replaceClassName(TransportConfiguration tc) { + if (tc != null) { String className = tc.getFactoryClassName().replace("org.apache.activemq.artemis", "org.hornetq").replace("ActiveMQ", "HornetQ"); return new TransportConfiguration(className, tc.getParams(), tc.getName()); }