activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bizcenter <bizcenter...@gmail.com>
Subject Producer Window Size
Date Tue, 25 Jun 2013 11:14:09 GMT
In org.apache.activemq.broker.region.Queue#send(ProducerBrokerExchange,
Message), it just determine that the producerWindowSize is greater than 0,
so set producerWindowSize to 1024 or 10240 can do the same effect??? am i
right?

public void send(final ProducerBrokerExchange producerExchange, final
Message message) throws Exception {
        final ConnectionContext context =
producerExchange.getConnectionContext();
        // There is delay between the client sending it and it arriving at
the
        // destination.. it may have expired.
        message.setRegionDestination(this);
        ProducerState state = producerExchange.getProducerState();
        if (state == null) {
            LOG.warn("Send failed for: " + message + ",  missing producer
state for: " + producerExchange);
            throw new JMSException("Cannot send message to " +
getActiveMQDestination() + " with invalid (null) producer state");
        }
        final ProducerInfo producerInfo =
producerExchange.getProducerState().getInfo();
      *  final boolean sendProducerAck = !message.isResponseRequired() &&
producerInfo.getWindowSize() > 0*
                && !context.isInRecoveryMode();
        if (message.isExpired()) {
            // message not stored - or added to stats yet - so chuck here
            broker.getRoot().messageExpired(context, message, null);
            if (sendProducerAck) {
                ProducerAck ack = new
ProducerAck(producerInfo.getProducerId(), message.getSize());
                context.getConnection().dispatchAsync(ack);
            }
            return;
        }
        if (memoryUsage.isFull()) {
            isFull(context, memoryUsage);
            fastProducer(context, producerInfo);
            if (isProducerFlowControl() && context.isProducerFlowControl())
{
                if (warnOnProducerFlowControl) {
                    warnOnProducerFlowControl = false;
                    LOG
                            .info("Usage Manager Memory Limit ("
                                    + memoryUsage.getLimit()
                                    + ") reached on "
                                    +
getActiveMQDestination().getQualifiedName()
                                    + ". Producers will be throttled to the
rate at which messages are removed from this destination to prevent flooding
it."
                                    + " See
http://activemq.apache.org/producer-flow-control.html for more info");
                }

                if (!context.isNetworkConnection() &&
systemUsage.isSendFailIfNoSpace()) {
                    throw new ResourceAllocationException("Usage Manager
Memory Limit reached. Stopping producer ("
                            + message.getProducerId() + ") to prevent
flooding "
                            + getActiveMQDestination().getQualifiedName() +
"."
                            + " See
http://activemq.apache.org/producer-flow-control.html for more info");
                }

                // We can avoid blocking due to low usage if the producer is
                // sending
                // a sync message or if it is using a producer window
                *if (producerInfo.getWindowSize() > 0 ||
message.isResponseRequired()) {*
                    // copy the exchange state since the context will be
                    // modified while we are waiting
                    // for space.
                    final ProducerBrokerExchange producerExchangeCopy =
producerExchange.copy();
                    synchronized (messagesWaitingForSpace) {
                     // Start flow control timeout task
                        // Prevent trying to start it multiple times
                        if (!flowControlTimeoutTask.isAlive()) {
                            flowControlTimeoutTask.setName(getName()+"
Producer Flow Control Timeout Task");
                            flowControlTimeoutTask.start();
                        }
                        messagesWaitingForSpace.put(message.getMessageId(),
new Runnable() {
                            public void run() {

                                try {
                                    // While waiting for space to free up...
the
                                    // message may have expired.
                                    if (message.isExpired()) {
                                        LOG.error("expired waiting for
space..");
                                        broker.messageExpired(context,
message, null);
                                       
destinationStatistics.getExpired().increment();
                                    } else {
                                        doMessageSend(producerExchangeCopy,
message);
                                    }

                                    if (sendProducerAck) {
                                        ProducerAck ack = new
ProducerAck(producerInfo.getProducerId(), message
                                                .getSize());
                                       
context.getConnection().dispatchAsync(ack);
                                    } else {
                                        Response response = new Response();
                                       
response.setCorrelationId(message.getCommandId());
                                       
context.getConnection().dispatchAsync(response);
                                    }

                                } catch (Exception e) {
                                    if (!sendProducerAck &&
!context.isInRecoveryMode()) {
                                        ExceptionResponse response = new
ExceptionResponse(e);
                                       
response.setCorrelationId(message.getCommandId());
                                       
context.getConnection().dispatchAsync(response);
                                    } else {
                                        LOG.debug("unexpected exception on
deferred send of :" + message, e);
                                    }
                                }
                            }
                        });

                        if (!context.isNetworkConnection() &&
systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
                            flowControlTimeoutMessages.add(new
TimeoutMessage(message, context, systemUsage
                                    .getSendFailIfNoSpaceAfterTimeout()));
                        }

                        registerCallbackForNotFullNotification();
                        context.setDontSendReponse(true);
                        return;
                    }

                } else {

                    if (memoryUsage.isFull()) {
                        waitForSpace(context, memoryUsage, "Usage Manager
Memory Limit reached. Producer ("
                                + message.getProducerId() + ") stopped to
prevent flooding "
                                +
getActiveMQDestination().getQualifiedName() + "."
                                + " See
http://activemq.apache.org/producer-flow-control.html for more info");
                    }

                    // The usage manager could have delayed us by the time
                    // we unblock the message could have expired..
                    if (message.isExpired()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Expired message: " + message);
                        }
                        broker.getRoot().messageExpired(context, message,
null);
                        return;
                    }
                }
            }
        }
        doMessageSend(producerExchange, message);
        if (sendProducerAck) {
            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(),
message.getSize());
            context.getConnection().dispatchAsync(ack);
        }
    }



--
View this message in context: http://activemq.2283324.n4.nabble.com/Producer-Window-Size-tp4668547.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message