activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-1269 Improving Session Handling after replication changes
Date Tue, 11 Jul 2017 18:45:11 GMT
ARTEMIS-1269 Improving Session Handling after replication changes

changes on ARTEMIS-1269 would have some implications on performance
this is to improve the packet handling.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4b943a74
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4b943a74
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4b943a74

Branch: refs/heads/master
Commit: 4b943a745b1e91f41dc89bb924ac33378a21aa79
Parents: 7fd17f4
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Tue Jul 11 12:04:38 2017 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Jul 11 14:23:03 2017 -0400

----------------------------------------------------------------------
 .../core/protocol/ServerPacketDecoder.java      |  51 ++-
 .../core/ServerSessionPacketHandler.java        | 324 +++++++++++++------
 2 files changed, 279 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b943a74/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index 45082b9..bcbe633 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -46,6 +46,9 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
 
@@ -70,6 +73,9 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REP
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE_V2;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE;
 
@@ -78,18 +84,55 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
    private static final long serialVersionUID = 3348673114388400766L;
    public static final ServerPacketDecoder INSTANCE = new ServerPacketDecoder();
 
+   private static SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in) {
+      final SessionSendMessage sendMessage = new SessionSendMessage(new CoreMessage());
+      sendMessage.decode(in);
+      return sendMessage;
+   }
+
+   private static SessionAcknowledgeMessage decodeSessionAcknowledgeMessage(final ActiveMQBuffer
in) {
+      final SessionAcknowledgeMessage acknowledgeMessage = new SessionAcknowledgeMessage();
+      acknowledgeMessage.decode(in);
+      return acknowledgeMessage;
+   }
+
+   private static SessionRequestProducerCreditsMessage decodeRequestProducerCreditsMessage(final
ActiveMQBuffer in) {
+      final SessionRequestProducerCreditsMessage requestProducerCreditsMessage = new SessionRequestProducerCreditsMessage();
+      requestProducerCreditsMessage.decode(in);
+      return requestProducerCreditsMessage;
+   }
+
+   private static SessionConsumerFlowCreditMessage decodeSessionConsumerFlowCreditMessage(final
ActiveMQBuffer in) {
+      final SessionConsumerFlowCreditMessage sessionConsumerFlowCreditMessage = new SessionConsumerFlowCreditMessage();
+      sessionConsumerFlowCreditMessage.decode(in);
+      return sessionConsumerFlowCreditMessage;
+   }
+
    @Override
    public Packet decode(final ActiveMQBuffer in) {
       final byte packetType = in.readByte();
+      //optimized for the most common cases: hottest and commons methods will be inlined
and this::decode too due to the byte code size
+      switch (packetType) {
+         case SESS_SEND:
+            return decodeSessionSendMessage(in);
+         case SESS_ACKNOWLEDGE:
+            return decodeSessionAcknowledgeMessage(in);
+         case SESS_PRODUCER_REQUEST_CREDITS:
+            return decodeRequestProducerCreditsMessage(in);
+         case SESS_FLOWTOKEN:
+            return decodeSessionConsumerFlowCreditMessage(in);
+         default:
+            return slowPathDecode(in, packetType);
+      }
+   }
 
+
+   // separating for performance reasons
+   private Packet slowPathDecode(ActiveMQBuffer in, byte packetType) {
       Packet packet;
 
       switch (packetType) {
 
-         case SESS_SEND: {
-            packet = new SessionSendMessage(new CoreMessage());
-            break;
-         }
          case SESS_SEND_LARGE: {
             packet = new SessionSendLargeMessage(new CoreMessage());
             break;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b943a74/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 0c95bed..bd97939 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -114,6 +114,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
@@ -249,11 +250,39 @@ public class ServerSessionPacketHandler implements ChannelHandler {
       packetActor.act(packet);
    }
 
-
-   // this method is used as a listener on the packetActor
    private void onMessagePacket(final Packet packet) {
-      byte type = packet.getType();
+      if (logger.isTraceEnabled()) {
+         logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
+      }
+      final byte type = packet.getType();
+      switch (type) {
+         case SESS_SEND: {
+            onSessionSend(packet);
+            break;
+         }
+         case SESS_ACKNOWLEDGE: {
+            onSessionAcknowledge(packet);
+            break;
+         }
+         case SESS_PRODUCER_REQUEST_CREDITS: {
+            onSessionRequestProducerCredits(packet);
+            break;
+         }
+         case SESS_FLOWTOKEN: {
+            onSessionConsumerFlowCredit(packet);
+            break;
+         }
+         default:
+            // separating a method for everything else as JIT was faster this way
+            slowPacketHandler(packet);
+            break;
+      }
+   }
 
+   // This is being separated from onMessagePacket as JIT was more efficient with a small
method for the
+   // hot executions.
+   private void slowPacketHandler(final Packet packet) {
+      final byte type = packet.getType();
       storageManager.setContext(session.getSessionContext());
 
       Packet response = null;
@@ -261,13 +290,23 @@ public class ServerSessionPacketHandler implements ChannelHandler {
       boolean closeChannel = false;
       boolean requiresResponse = false;
 
-      if (logger.isTraceEnabled()) {
-         logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
-      }
-
       try {
          try {
             switch (type) {
+               case SESS_SEND_LARGE: {
+                  SessionSendLargeMessage message = (SessionSendLargeMessage) packet;
+                  sendLarge(message.getLargeMessage());
+                  break;
+               }
+               case SESS_SEND_CONTINUATION: {
+                  SessionSendContinuationMessage message = (SessionSendContinuationMessage)
packet;
+                  requiresResponse = message.isRequiresResponse();
+                  sendContinuations(message.getPacketSize(), message.getMessageBodySize(),
message.getBody(), message.isContinues());
+                  if (requiresResponse) {
+                     response = new NullResponseMessage();
+                  }
+                  break;
+               }
                case SESS_CREATECONSUMER: {
                   SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
                   requiresResponse = request.isRequiresResponse();
@@ -385,15 +424,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   }
                   break;
                }
-               case SESS_ACKNOWLEDGE: {
-                  SessionAcknowledgeMessage message = (SessionAcknowledgeMessage) packet;
-                  requiresResponse = message.isRequiresResponse();
-                  session.acknowledge(message.getConsumerID(), message.getMessageID());
-                  if (requiresResponse) {
-                     response = new NullResponseMessage();
-                  }
-                  break;
-               }
                case SESS_EXPIRED: {
                   SessionExpireMessage message = (SessionExpireMessage) packet;
                   session.expire(message.getConsumerID(), message.getMessageID());
@@ -534,44 +564,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   response = new NullResponseMessage();
                   break;
                }
-               case SESS_FLOWTOKEN: {
-                  SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage)
packet;
-                  session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
-                  break;
-               }
-               case SESS_SEND: {
-                  SessionSendMessage message = (SessionSendMessage) packet;
-                  requiresResponse = message.isRequiresResponse();
-                  session.send(message.getMessage(), direct);
-                  if (requiresResponse) {
-                     response = new NullResponseMessage();
-                  }
-                  break;
-               }
-               case SESS_SEND_LARGE: {
-                  SessionSendLargeMessage message = (SessionSendLargeMessage) packet;
-                  sendLarge(message.getLargeMessage());
-                  break;
-               }
-               case SESS_SEND_CONTINUATION: {
-                  SessionSendContinuationMessage message = (SessionSendContinuationMessage)
packet;
-                  requiresResponse = message.isRequiresResponse();
-                  sendContinuations(message.getPacketSize(), message.getMessageBodySize(),
message.getBody(), message.isContinues());
-                  if (requiresResponse) {
-                     response = new NullResponseMessage();
-                  }
-                  break;
-               }
                case SESS_FORCE_CONSUMER_DELIVERY: {
                   SessionForceConsumerDelivery message = (SessionForceConsumerDelivery) packet;
                   session.forceConsumerDelivery(message.getConsumerID(), message.getSequence());
                   break;
                }
-               case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS: {
-                  SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage)
packet;
-                  session.requestProducerCredits(message.getAddress(), message.getCredits());
-                  break;
-               }
                case PacketImpl.SESS_ADD_METADATA: {
                   response = new NullResponseMessage();
                   SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet;
@@ -597,56 +594,203 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                }
             }
          } catch (ActiveMQIOErrorException e) {
-            getSession().markTXFailed(e);
-            if (requiresResponse) {
-               logger.debug("Sending exception to client", e);
-               response = new ActiveMQExceptionMessage(e);
-            } else {
-               ActiveMQServerLogger.LOGGER.caughtException(e);
-            }
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response,
this.session);
          } catch (ActiveMQXAException e) {
-            if (requiresResponse) {
-               logger.debug("Sending exception to client", e);
-               response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
-            } else {
-               ActiveMQServerLogger.LOGGER.caughtXaException(e);
-            }
+            response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
          } catch (ActiveMQQueueMaxConsumerLimitReached e) {
-            if (requiresResponse) {
-               logger.debug("Sending exception to client", e);
-               response = new ActiveMQExceptionMessage(e);
-            } else {
-               ActiveMQServerLogger.LOGGER.caughtException(e);
-            }
+            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse,
response);
          } catch (ActiveMQException e) {
+            response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+         } catch (Throwable t) {
+            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+         }
+         sendResponse(packet, response, flush, closeChannel);
+      } finally {
+         storageManager.clearContext();
+      }
+   }
+
+   private void onSessionAcknowledge(Packet packet) {
+      this.storageManager.setContext(session.getSessionContext());
+      try {
+         Packet response = null;
+         boolean requiresResponse = false;
+         try {
+            final SessionAcknowledgeMessage message = (SessionAcknowledgeMessage) packet;
+            requiresResponse = message.isRequiresResponse();
+            this.session.acknowledge(message.getConsumerID(), message.getMessageID());
             if (requiresResponse) {
-               logger.debug("Sending exception to client", e);
-               response = new ActiveMQExceptionMessage(e);
-            } else {
-               if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
-                  logger.debug("Caught exception", e);
-               } else {
-                  ActiveMQServerLogger.LOGGER.caughtException(e);
-               }
+               response = new NullResponseMessage();
             }
+         } catch (ActiveMQIOErrorException e) {
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response,
this.session);
+         } catch (ActiveMQXAException e) {
+            response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+         } catch (ActiveMQQueueMaxConsumerLimitReached e) {
+            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse,
response);
+         } catch (ActiveMQException e) {
+            response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
          } catch (Throwable t) {
-            getSession().markTXFailed(t);
+            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+         }
+         sendResponse(packet, response, false, false);
+      } finally {
+         this.storageManager.clearContext();
+      }
+   }
+
+   private void onSessionSend(Packet packet) {
+      this.storageManager.setContext(session.getSessionContext());
+      try {
+         Packet response = null;
+         boolean requiresResponse = false;
+         try {
+            final SessionSendMessage message = (SessionSendMessage) packet;
+            requiresResponse = message.isRequiresResponse();
+            this.session.send(message.getMessage(), this.direct);
             if (requiresResponse) {
-               ActiveMQServerLogger.LOGGER.warn("Sending unexpected exception to the client",
t);
-               ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
-               activeMQInternalErrorException.initCause(t);
-               response = new ActiveMQExceptionMessage(activeMQInternalErrorException);
-            } else {
-               ActiveMQServerLogger.LOGGER.caughtException(t);
+               response = new NullResponseMessage();
             }
+         } catch (ActiveMQIOErrorException e) {
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response,
this.session);
+         } catch (ActiveMQXAException e) {
+            response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+         } catch (ActiveMQQueueMaxConsumerLimitReached e) {
+            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse,
response);
+         } catch (ActiveMQException e) {
+            response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+         } catch (Throwable t) {
+            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
          }
+         sendResponse(packet, response, false, false);
+      } finally {
+         this.storageManager.clearContext();
+      }
+   }
 
-         sendResponse(packet, response, flush, closeChannel);
+   private void onSessionRequestProducerCredits(Packet packet) {
+      this.storageManager.setContext(session.getSessionContext());
+      try {
+         Packet response = null;
+         boolean requiresResponse = false;
+         try {
+            SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage)
packet;
+            session.requestProducerCredits(message.getAddress(), message.getCredits());
+         } catch (ActiveMQIOErrorException e) {
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response,
this.session);
+         } catch (ActiveMQXAException e) {
+            response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+         } catch (ActiveMQQueueMaxConsumerLimitReached e) {
+            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse,
response);
+         } catch (ActiveMQException e) {
+            response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+         } catch (Throwable t) {
+            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+         }
+         sendResponse(packet, response, false, false);
       } finally {
-         storageManager.clearContext();
+         this.storageManager.clearContext();
+      }
+   }
+
+   private void onSessionConsumerFlowCredit(Packet packet) {
+      this.storageManager.setContext(session.getSessionContext());
+      try {
+         Packet response = null;
+         boolean requiresResponse = false;
+         try {
+            SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage)
packet;
+            session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
+         } catch (ActiveMQIOErrorException e) {
+            response = onActiveMQIOErrorExceptionWhileHandlePacket(e, requiresResponse, response,
this.session);
+         } catch (ActiveMQXAException e) {
+            response = onActiveMQXAExceptionWhileHandlePacket(e, requiresResponse, response);
+         } catch (ActiveMQQueueMaxConsumerLimitReached e) {
+            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(e, requiresResponse,
response);
+         } catch (ActiveMQException e) {
+            response = onActiveMQExceptionWhileHandlePacket(e, requiresResponse, response);
+         } catch (Throwable t) {
+            response = onCatchThrowableWhileHandlePacket(t, requiresResponse, response, this.session);
+         }
+         sendResponse(packet, response, false, false);
+      } finally {
+         this.storageManager.clearContext();
+      }
+   }
+
+
+   private static Packet onActiveMQIOErrorExceptionWhileHandlePacket(ActiveMQIOErrorException
e,
+                                                                     boolean requiresResponse,
+                                                                     Packet response,
+                                                                     ServerSession session)
{
+      session.markTXFailed(e);
+      if (requiresResponse) {
+         logger.debug("Sending exception to client", e);
+         response = new ActiveMQExceptionMessage(e);
+      } else {
+         ActiveMQServerLogger.LOGGER.caughtException(e);
+      }
+      return response;
+   }
+
+   private static Packet onActiveMQXAExceptionWhileHandlePacket(ActiveMQXAException e,
+                                                                boolean requiresResponse,
+                                                                Packet response) {
+      if (requiresResponse) {
+         logger.debug("Sending exception to client", e);
+         response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
+      } else {
+         ActiveMQServerLogger.LOGGER.caughtXaException(e);
+      }
+      return response;
+   }
+
+   private static Packet onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(ActiveMQQueueMaxConsumerLimitReached
e,
+                                                                                 boolean
requiresResponse,
+                                                                                 Packet response)
{
+      if (requiresResponse) {
+         logger.debug("Sending exception to client", e);
+         response = new ActiveMQExceptionMessage(e);
+      } else {
+         ActiveMQServerLogger.LOGGER.caughtException(e);
+      }
+      return response;
+   }
+
+   private static Packet onActiveMQExceptionWhileHandlePacket(ActiveMQException e,
+                                                              boolean requiresResponse,
+                                                              Packet response) {
+      if (requiresResponse) {
+         logger.debug("Sending exception to client", e);
+         response = new ActiveMQExceptionMessage(e);
+      } else {
+         if (e.getType() == ActiveMQExceptionType.QUEUE_EXISTS) {
+            logger.debug("Caught exception", e);
+         } else {
+            ActiveMQServerLogger.LOGGER.caughtException(e);
+         }
+      }
+      return response;
+   }
+
+   private static Packet onCatchThrowableWhileHandlePacket(Throwable t,
+                                                           boolean requiresResponse,
+                                                           Packet response,
+                                                           ServerSession session) {
+      session.markTXFailed(t);
+      if (requiresResponse) {
+         ActiveMQServerLogger.LOGGER.warn("Sending unexpected exception to the client", t);
+         ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
+         activeMQInternalErrorException.initCause(t);
+         response = new ActiveMQExceptionMessage(activeMQInternalErrorException);
+      } else {
+         ActiveMQServerLogger.LOGGER.caughtException(t);
       }
+      return response;
    }
 
+
+
    private void sendResponse(final Packet confirmPacket,
                              final Packet response,
                              final boolean flush,
@@ -792,12 +936,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
       currentLargeMessage = largeMsg;
    }
 
-
-
    private void sendContinuations(final int packetSize,
-                                 final long messageBodySize,
-                                 final byte[] body,
-                                 final boolean continues) throws Exception {
+                                  final long messageBodySize,
+                                  final byte[] body,
+                                  final boolean continues) throws Exception {
       if (currentLargeMessage == null) {
          throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised();
       }
@@ -814,12 +956,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
          }
 
-
          session.doSend(session.getCurrentTransaction(), currentLargeMessage, null, false,
false);
 
          currentLargeMessage = null;
       }
    }
 
-
 }


Mime
View raw message