activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Posta <christian.po...@gmail.com>
Subject Re: Producer Window Size
Date Tue, 25 Jun 2013 18:17:43 GMT
yes. if you send asynchronously (tx or non-persist messages by default),
you can enable producer window to not block the connection on producer flow
control.


On Tue, Jun 25, 2013 at 4:14 AM, bizcenter <bizcenternet@gmail.com> wrote:

> In org.apache.activemq.broker.region.Queue#send(ProducerBrokerExchange,
> Message), it just determine that the producerWindowSize is greater than 0,
> so set producerWindowSize to 1024 or 10240 can do the same effect??? am i
> right?
>
> public void send(final ProducerBrokerExchange producerExchange, final
> Message message) throws Exception {
>         final ConnectionContext context =
> producerExchange.getConnectionContext();
>         // There is delay between the client sending it and it arriving at
> the
>         // destination.. it may have expired.
>         message.setRegionDestination(this);
>         ProducerState state = producerExchange.getProducerState();
>         if (state == null) {
>             LOG.warn("Send failed for: " + message + ",  missing producer
> state for: " + producerExchange);
>             throw new JMSException("Cannot send message to " +
> getActiveMQDestination() + " with invalid (null) producer state");
>         }
>         final ProducerInfo producerInfo =
> producerExchange.getProducerState().getInfo();
>       *  final boolean sendProducerAck = !message.isResponseRequired() &&
> producerInfo.getWindowSize() > 0*
>                 && !context.isInRecoveryMode();
>         if (message.isExpired()) {
>             // message not stored - or added to stats yet - so chuck here
>             broker.getRoot().messageExpired(context, message, null);
>             if (sendProducerAck) {
>                 ProducerAck ack = new
> ProducerAck(producerInfo.getProducerId(), message.getSize());
>                 context.getConnection().dispatchAsync(ack);
>             }
>             return;
>         }
>         if (memoryUsage.isFull()) {
>             isFull(context, memoryUsage);
>             fastProducer(context, producerInfo);
>             if (isProducerFlowControl() && context.isProducerFlowControl())
> {
>                 if (warnOnProducerFlowControl) {
>                     warnOnProducerFlowControl = false;
>                     LOG
>                             .info("Usage Manager Memory Limit ("
>                                     + memoryUsage.getLimit()
>                                     + ") reached on "
>                                     +
> getActiveMQDestination().getQualifiedName()
>                                     + ". Producers will be throttled to the
> rate at which messages are removed from this destination to prevent
> flooding
> it."
>                                     + " See
> http://activemq.apache.org/producer-flow-control.html for more info");
>                 }
>
>                 if (!context.isNetworkConnection() &&
> systemUsage.isSendFailIfNoSpace()) {
>                     throw new ResourceAllocationException("Usage Manager
> Memory Limit reached. Stopping producer ("
>                             + message.getProducerId() + ") to prevent
> flooding "
>                             + getActiveMQDestination().getQualifiedName() +
> "."
>                             + " See
> http://activemq.apache.org/producer-flow-control.html for more info");
>                 }
>
>                 // We can avoid blocking due to low usage if the producer
> is
>                 // sending
>                 // a sync message or if it is using a producer window
>                 *if (producerInfo.getWindowSize() > 0 ||
> message.isResponseRequired()) {*
>                     // copy the exchange state since the context will be
>                     // modified while we are waiting
>                     // for space.
>                     final ProducerBrokerExchange producerExchangeCopy =
> producerExchange.copy();
>                     synchronized (messagesWaitingForSpace) {
>                      // Start flow control timeout task
>                         // Prevent trying to start it multiple times
>                         if (!flowControlTimeoutTask.isAlive()) {
>                             flowControlTimeoutTask.setName(getName()+"
> Producer Flow Control Timeout Task");
>                             flowControlTimeoutTask.start();
>                         }
>                         messagesWaitingForSpace.put(message.getMessageId(),
> new Runnable() {
>                             public void run() {
>
>                                 try {
>                                     // While waiting for space to free
> up...
> the
>                                     // message may have expired.
>                                     if (message.isExpired()) {
>                                         LOG.error("expired waiting for
> space..");
>                                         broker.messageExpired(context,
> message, null);
>
> destinationStatistics.getExpired().increment();
>                                     } else {
>                                         doMessageSend(producerExchangeCopy,
> message);
>                                     }
>
>                                     if (sendProducerAck) {
>                                         ProducerAck ack = new
> ProducerAck(producerInfo.getProducerId(), message
>                                                 .getSize());
>
> context.getConnection().dispatchAsync(ack);
>                                     } else {
>                                         Response response = new Response();
>
> response.setCorrelationId(message.getCommandId());
>
> context.getConnection().dispatchAsync(response);
>                                     }
>
>                                 } catch (Exception e) {
>                                     if (!sendProducerAck &&
> !context.isInRecoveryMode()) {
>                                         ExceptionResponse response = new
> ExceptionResponse(e);
>
> response.setCorrelationId(message.getCommandId());
>
> context.getConnection().dispatchAsync(response);
>                                     } else {
>                                         LOG.debug("unexpected exception on
> deferred send of :" + message, e);
>                                     }
>                                 }
>                             }
>                         });
>
>                         if (!context.isNetworkConnection() &&
> systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
>                             flowControlTimeoutMessages.add(new
> TimeoutMessage(message, context, systemUsage
>                                     .getSendFailIfNoSpaceAfterTimeout()));
>                         }
>
>                         registerCallbackForNotFullNotification();
>                         context.setDontSendReponse(true);
>                         return;
>                     }
>
>                 } else {
>
>                     if (memoryUsage.isFull()) {
>                         waitForSpace(context, memoryUsage, "Usage Manager
> Memory Limit reached. Producer ("
>                                 + message.getProducerId() + ") stopped to
> prevent flooding "
>                                 +
> getActiveMQDestination().getQualifiedName() + "."
>                                 + " See
> http://activemq.apache.org/producer-flow-control.html for more info");
>                     }
>
>                     // The usage manager could have delayed us by the time
>                     // we unblock the message could have expired..
>                     if (message.isExpired()) {
>                         if (LOG.isDebugEnabled()) {
>                             LOG.debug("Expired message: " + message);
>                         }
>                         broker.getRoot().messageExpired(context, message,
> null);
>                         return;
>                     }
>                 }
>             }
>         }
>         doMessageSend(producerExchange, message);
>         if (sendProducerAck) {
>             ProducerAck ack = new ProducerAck(producerInfo.getProducerId(),
> message.getSize());
>             context.getConnection().dispatchAsync(ack);
>         }
>     }
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Producer-Window-Size-tp4668547.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message