activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Tully <gary.tu...@gmail.com>
Subject Re: svn commit: r1413846 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/test/j...
Date Tue, 27 Nov 2012 11:27:07 GMT
there may be too much removed here. The adminview on isSlave was driven by
shared file system master slave, not puremaster slave.  One related issue
is https://issues.apache.org/jira/browse/AMQ-3696. not sure where the
original requirement came from. But it makes sense to have some
jmxpresence for a broker waiting to lock a store. Even if the broker
is
waiting to get a shared broker lock, it is nice to see that.
w.r.t the webconsole, the lifecycle of that can be independent of the
broker, so I think it is good that it can reflect a slave status or a
"trying to obtain lock" status.
Maybe the solution here is some jmx instrumentation on the abstract
plugable locker.


On 26 November 2012 21:13, <chirino@apache.org> wrote:

> Author: chirino
> Date: Mon Nov 26 21:13:25 2012
> New Revision: 1413846
>
> URL: http://svn.apache.org/viewvc?rev=1413846&view=rev
> Log:
> Changes for https://issues.apache.org/jira/browse/AMQ-4165 : Remove pure
> master/slave functionality
>
> Removed:
>
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/MasterSlaveDiscoveryTest.java
> Modified:
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
>
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
>
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
>
> activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
>
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
>
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> Mon Nov 26 21:13:25 2012
> @@ -168,7 +168,6 @@ public class BrokerService implements Se
>      private File schedulerDirectoryFile;
>      private Scheduler scheduler;
>      private ThreadPoolExecutor executor;
> -    private boolean slave = true;
>      private int schedulePeriodForDestinationPurge= 0;
>      private int maxPurgedDestinationsPerSweep = 0;
>      private BrokerContext brokerContext;
> @@ -392,13 +391,6 @@ public class BrokerService implements Se
>          return null;
>      }
>
> -    /**
> -     * @return true if this Broker is a slave to a Master
> -     */
> -    public boolean isSlave() {
> -        return slave;
> -    }
> -
>      public void masterFailed() {
>          if (shutdownOnMasterFailure) {
>              LOG.error("The Master has failed ... shutting down");
> @@ -578,7 +570,6 @@ public class BrokerService implements Se
>          if (startException != null) {
>              return;
>          }
> -        slave = false;
>          startDestinations();
>          addShutdownHook();
>
> @@ -604,9 +595,7 @@ public class BrokerService implements Se
>              adminView.setBroker(managedBroker);
>          }
>
> -        if (!isSlave()) {
> -            startAllConnectors();
> -        }
> +        startAllConnectors();
>
>          if (ioExceptionHandler == null) {
>              setIoExceptionHandler(new DefaultIOExceptionHandler());
> @@ -680,7 +669,6 @@ public class BrokerService implements Se
>          try {
>              stopper.stop(persistenceAdapter);
>              persistenceAdapter = null;
> -            slave = true;
>              if (isUseJmx()) {
>                  stopper.stop(getManagementContext());
>                  managementContext = null;
> @@ -1227,8 +1215,7 @@ public class BrokerService implements Se
>      }
>
>      /**
> -     * Sets the services associated with this broker such as a
> -     * {@link MasterConnector}
> +     * Sets the services associated with this broker.
>       */
>      public void setServices(Service[] services) {
>          this.services.clear();
> @@ -2246,82 +2233,80 @@ public class BrokerService implements Se
>       * @throws Exception
>       */
>      public void startAllConnectors() throws Exception {
> -        if (!isSlave()) {
> -            Set<ActiveMQDestination> durableDestinations =
> getBroker().getDurableDestinations();
> -            List<TransportConnector> al = new
> ArrayList<TransportConnector>();
> -            for (Iterator<TransportConnector> iter =
> getTransportConnectors().iterator(); iter.hasNext();) {
> -                TransportConnector connector = iter.next();
> -                connector.setBrokerService(this);
> -                al.add(startTransportConnector(connector));
> -            }
> -            if (al.size() > 0) {
> -                // let's clear the transportConnectors list and replace
> it with
> -                // the started transportConnector instances
> -                this.transportConnectors.clear();
> -                setTransportConnectors(al);
> -            }
> -            URI uri = getVmConnectorURI();
> -            Map<String, String> map = new HashMap<String,
> String>(URISupport.parseParameters(uri));
> -            map.put("network", "true");
> -            map.put("async", "false");
> -            uri = URISupport.createURIWithQuery(uri,
> URISupport.createQueryString(map));
> -
> -            if (!stopped.get()) {
> -                ThreadPoolExecutor networkConnectorStartExecutor = null;
> -                if (isNetworkConnectorStartAsync()) {
> -                    // spin up as many threads as needed
> -                    networkConnectorStartExecutor = new
> ThreadPoolExecutor(0, Integer.MAX_VALUE,
> -                            10, TimeUnit.SECONDS, new
> SynchronousQueue<Runnable>(),
> -                            new ThreadFactory() {
> -                                int count=0;
> -                                public Thread newThread(Runnable
> runnable) {
> -                                    Thread thread = new Thread(runnable,
> "NetworkConnector Start Thread-" +(count++));
> -                                    thread.setDaemon(true);
> -                                    return thread;
> -                                }
> -                            });
> -                }
> +        Set<ActiveMQDestination> durableDestinations =
> getBroker().getDurableDestinations();
> +        List<TransportConnector> al = new ArrayList<TransportConnector>();
> +        for (Iterator<TransportConnector> iter =
> getTransportConnectors().iterator(); iter.hasNext();) {
> +            TransportConnector connector = iter.next();
> +            connector.setBrokerService(this);
> +            al.add(startTransportConnector(connector));
> +        }
> +        if (al.size() > 0) {
> +            // let's clear the transportConnectors list and replace it
> with
> +            // the started transportConnector instances
> +            this.transportConnectors.clear();
> +            setTransportConnectors(al);
> +        }
> +        URI uri = getVmConnectorURI();
> +        Map<String, String> map = new HashMap<String,
> String>(URISupport.parseParameters(uri));
> +        map.put("network", "true");
> +        map.put("async", "false");
> +        uri = URISupport.createURIWithQuery(uri,
> URISupport.createQueryString(map));
>
> -                for (Iterator<NetworkConnector> iter =
> getNetworkConnectors().iterator(); iter.hasNext();) {
> -                    final NetworkConnector connector = iter.next();
> -                    connector.setLocalUri(uri);
> -                    connector.setBrokerName(getBrokerName());
> -                    connector.setDurableDestinations(durableDestinations);
> -                    if (getDefaultSocketURIString() != null) {
> -
>  connector.setBrokerURL(getDefaultSocketURIString());
> -                    }
> -                    if (networkConnectorStartExecutor != null) {
> -                        networkConnectorStartExecutor.execute(new
> Runnable() {
> -                            public void run() {
> -                                try {
> -                                    LOG.info("Async start of " +
> connector);
> -                                    connector.start();
> -                                } catch(Exception e) {
> -                                    LOG.error("Async start of network
> connector: " + connector + " failed", e);
> -                                }
> +        if (!stopped.get()) {
> +            ThreadPoolExecutor networkConnectorStartExecutor = null;
> +            if (isNetworkConnectorStartAsync()) {
> +                // spin up as many threads as needed
> +                networkConnectorStartExecutor = new ThreadPoolExecutor(0,
> Integer.MAX_VALUE,
> +                        10, TimeUnit.SECONDS, new
> SynchronousQueue<Runnable>(),
> +                        new ThreadFactory() {
> +                            int count=0;
> +                            public Thread newThread(Runnable runnable) {
> +                                Thread thread = new Thread(runnable,
> "NetworkConnector Start Thread-" +(count++));
> +                                thread.setDaemon(true);
> +                                return thread;
>                              }
>                          });
> -                    } else {
> -                        connector.start();
> -                    }
> -                }
> -                if (networkConnectorStartExecutor != null) {
> -                    // executor done when enqueued tasks are complete
> -
>  ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
> -                }
> +            }
>
> -                for (Iterator<ProxyConnector> iter =
> getProxyConnectors().iterator(); iter.hasNext();) {
> -                    ProxyConnector connector = iter.next();
> -                    connector.start();
> +            for (Iterator<NetworkConnector> iter =
> getNetworkConnectors().iterator(); iter.hasNext();) {
> +                final NetworkConnector connector = iter.next();
> +                connector.setLocalUri(uri);
> +                connector.setBrokerName(getBrokerName());
> +                connector.setDurableDestinations(durableDestinations);
> +                if (getDefaultSocketURIString() != null) {
> +                    connector.setBrokerURL(getDefaultSocketURIString());
>                  }
> -                for (Iterator<JmsConnector> iter =
> jmsConnectors.iterator(); iter.hasNext();) {
> -                    JmsConnector connector = iter.next();
> +                if (networkConnectorStartExecutor != null) {
> +                    networkConnectorStartExecutor.execute(new Runnable() {
> +                        public void run() {
> +                            try {
> +                                LOG.info("Async start of " + connector);
> +                                connector.start();
> +                            } catch(Exception e) {
> +                                LOG.error("Async start of network
> connector: " + connector + " failed", e);
> +                            }
> +                        }
> +                    });
> +                } else {
>                      connector.start();
>                  }
> -                for (Service service : services) {
> -                    configureService(service);
> -                    service.start();
> -                }
> +            }
> +            if (networkConnectorStartExecutor != null) {
> +                // executor done when enqueued tasks are complete
> +                ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
> +            }
> +
> +            for (Iterator<ProxyConnector> iter =
> getProxyConnectors().iterator(); iter.hasNext();) {
> +                ProxyConnector connector = iter.next();
> +                connector.start();
> +            }
> +            for (Iterator<JmsConnector> iter = jmsConnectors.iterator();
> iter.hasNext();) {
> +                JmsConnector connector = iter.next();
> +                connector.start();
> +            }
> +            for (Service service : services) {
> +                configureService(service);
> +                service.start();
>              }
>          }
>      }
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
> Mon Nov 26 21:13:25 2012
> @@ -296,13 +296,6 @@ public class ConnectionContext {
>      }
>
>      /**
> -     * @return the slave
> -     */
> -    public boolean isSlave() {
> -        return (this.broker != null &&
> this.broker.getBrokerService().isSlave()) || !this.clientMaster;
> -    }
> -
> -    /**
>       * @return the clientMaster
>       */
>      public boolean isClientMaster() {
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
> Mon Nov 26 21:13:25 2012
> @@ -186,10 +186,6 @@ public class BrokerView implements Broke
>          return brokerService.isPersistent();
>      }
>
> -    public boolean isSlave() {
> -        return brokerService.isSlave();
> -    }
> -
>      public void terminateJVM(int exitCode) {
>          System.exit(exitCode);
>      }
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
> Mon Nov 26 21:13:25 2012
> @@ -115,9 +115,6 @@ public interface BrokerViewMBean extends
>      @MBeanInfo("Messages are synchronized to disk.")
>      boolean isPersistent();
>
> -    @MBeanInfo("Slave broker.")
> -    boolean isSlave();
> -
>      /**
>       * Shuts down the JVM.
>       *
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
> Mon Nov 26 21:13:25 2012
> @@ -116,10 +116,6 @@ public abstract class AbstractSubscripti
>      public void gc() {
>      }
>
> -    public boolean isSlave() {
> -        return broker.getBrokerService().isSlave();
> -    }
> -
>      public ConnectionContext getContext() {
>          return context;
>      }
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> Mon Nov 26 21:13:25 2012
> @@ -92,7 +92,7 @@ public abstract class PrefetchSubscripti
>          // The slave should not deliver pull messages.
>          // TODO: when the slave becomes a master, He should send a NULL
> message to all the
>          // consumers to 'wake them up' in case they were waiting for a
> message.
> -        if (getPrefetchSize() == 0 && !isSlave()) {
> +        if (getPrefetchSize() == 0) {
>
>              prefetchExtension.incrementAndGet();
>              final long dispatchCounterBeforePull = dispatchCounter;
> @@ -194,13 +194,12 @@ public abstract class PrefetchSubscripti
>          boolean callDispatchMatched = false;
>          Destination destination = null;
>
> -        if (!isSlave()) {
> -            if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS))
> {
> -                // suppress unexpected ack exception in this expected case
> -                LOG.warn("Ignoring ack received before dispatch; result
> of failover with an outstanding ack. Acked messages will be replayed if
> present on this broker. Ignored ack: " + ack);
> -                return;
> -            }
> +        if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
> +            // suppress unexpected ack exception in this expected case
> +            LOG.warn("Ignoring ack received before dispatch; result of
> failover with an outstanding ack. Acked messages will be replayed if
> present on this broker. Ignored ack: " + ack);
> +            return;
>          }
> +
>          if (LOG.isTraceEnabled()) {
>              LOG.trace("ack:" + ack);
>          }
> @@ -413,15 +412,8 @@ public abstract class PrefetchSubscripti
>              destination.wakeup();
>              dispatchPending();
>          } else {
> -            if (isSlave()) {
> -                throw new JMSException(
> -                        "Slave broker out of sync with master:
> Acknowledgment ("
> -                                + ack + ") was not in the dispatch list: "
> -                                + dispatched);
> -            } else {
> -                LOG.debug("Acknowledgment out of sync (Normally occurs
> when failover connection reconnects): "
> -                        + ack);
> -            }
> +            LOG.debug("Acknowledgment out of sync (Normally occurs when
> failover connection reconnects): "
> +                    + ack);
>          }
>      }
>
> @@ -447,11 +439,7 @@ public abstract class PrefetchSubscripti
>                      @Override
>                      public void afterRollback() throws Exception {
>                          synchronized(dispatchLock) {
> -                            if (isSlave()) {
> -
>  ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
> -                            } else {
> -                                // poisionAck will decrement - otherwise
> still inflight on client
> -                            }
> +                            // poisionAck will decrement - otherwise
> still inflight on client
>                          }
>                      }
>                  });
> @@ -617,53 +605,51 @@ public abstract class PrefetchSubscripti
>      }
>
>      protected void dispatchPending() throws IOException {
> -        if (!isSlave()) {
> -           synchronized(pendingLock) {
> -                try {
> -                    int numberToDispatch = countBeforeFull();
> -                    if (numberToDispatch > 0) {
> -                        setSlowConsumer(false);
> -                        setPendingBatchSize(pending, numberToDispatch);
> -                        int count = 0;
> -                        pending.reset();
> -                        while (pending.hasNext() && !isFull()
> -                                && count < numberToDispatch) {
> -                            MessageReference node = pending.next();
> -                            if (node == null) {
> -                                break;
> -                            }
> +       synchronized(pendingLock) {
> +            try {
> +                int numberToDispatch = countBeforeFull();
> +                if (numberToDispatch > 0) {
> +                    setSlowConsumer(false);
> +                    setPendingBatchSize(pending, numberToDispatch);
> +                    int count = 0;
> +                    pending.reset();
> +                    while (pending.hasNext() && !isFull()
> +                            && count < numberToDispatch) {
> +                        MessageReference node = pending.next();
> +                        if (node == null) {
> +                            break;
> +                        }
> +
> +                        // Synchronize between dispatched list and remove
> of message from pending list
> +                        // related to remove subscription action
> +                        synchronized(dispatchLock) {
> +                            pending.remove();
> +                            node.decrementReferenceCount();
> +                            if( !isDropped(node) && canDispatch(node)) {
>
> -                            // Synchronize between dispatched list and
> remove of message from pending list
> -                            // related to remove subscription action
> -                            synchronized(dispatchLock) {
> -                                pending.remove();
> -                                node.decrementReferenceCount();
> -                                if( !isDropped(node) &&
> canDispatch(node)) {
> -
> -                                    // Message may have been sitting in
> the pending
> -                                    // list a while waiting for the
> consumer to ak the message.
> -                                    if
> (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
> -                                        //increment number to dispatch
> -                                        numberToDispatch++;
> -                                        if (broker.isExpired(node)) {
> -
>  ((Destination)node.getRegionDestination()).messageExpired(context, this,
> node);
> -                                        }
> -                                        continue;
> +                                // Message may have been sitting in the
> pending
> +                                // list a while waiting for the consumer
> to ak the message.
> +                                if
> (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
> +                                    //increment number to dispatch
> +                                    numberToDispatch++;
> +                                    if (broker.isExpired(node)) {
> +
>  ((Destination)node.getRegionDestination()).messageExpired(context, this,
> node);
>                                      }
> -                                    dispatch(node);
> -                                    count++;
> +                                    continue;
>                                  }
> +                                dispatch(node);
> +                                count++;
>                              }
>                          }
> -                    } else if (!isSlowConsumer()) {
> -                        setSlowConsumer(true);
> -                        for (Destination dest :destinations) {
> -                            dest.slowConsumer(context, this);
> -                        }
>                      }
> -                } finally {
> -                    pending.release();
> +                } else if (!isSlowConsumer()) {
> +                    setSlowConsumer(true);
> +                    for (Destination dest :destinations) {
> +                        dest.slowConsumer(context, this);
> +                    }
>                  }
> +            } finally {
> +                pending.release();
>              }
>          }
>      }
> @@ -682,42 +668,37 @@ public abstract class PrefetchSubscripti
>          okForAckAsDispatchDone.countDown();
>
>          // No reentrant lock - Patch needed to IndirectMessageReference
> on method lock
> -        if (!isSlave()) {
> -
> -            MessageDispatch md = createMessageDispatch(node, message);
> -            // NULL messages don't count... they don't get Acked.
> -            if (node != QueueMessageReference.NULL_MESSAGE) {
> -                dispatchCounter++;
> -                dispatched.add(node);
> -            } else {
> -                while (true) {
> -                    int currentExtension = prefetchExtension.get();
> -                    int newExtension = Math.max(0, currentExtension - 1);
> -                    if (prefetchExtension.compareAndSet(currentExtension,
> newExtension)) {
> -                        break;
> -                    }
> +        MessageDispatch md = createMessageDispatch(node, message);
> +        // NULL messages don't count... they don't get Acked.
> +        if (node != QueueMessageReference.NULL_MESSAGE) {
> +            dispatchCounter++;
> +            dispatched.add(node);
> +        } else {
> +            while (true) {
> +                int currentExtension = prefetchExtension.get();
> +                int newExtension = Math.max(0, currentExtension - 1);
> +                if (prefetchExtension.compareAndSet(currentExtension,
> newExtension)) {
> +                    break;
>                  }
>              }
> -            if (info.isDispatchAsync()) {
> -                md.setTransmitCallback(new Runnable() {
> +        }
> +        if (info.isDispatchAsync()) {
> +            md.setTransmitCallback(new Runnable() {
>
> -                    public void run() {
> -                        // Since the message gets queued up in async
> dispatch,
> -                        // we don't want to
> -                        // decrease the reference count until it gets put
> on the
> -                        // wire.
> -                        onDispatch(node, message);
> -                    }
> -                });
> -                context.getConnection().dispatchAsync(md);
> -            } else {
> -                context.getConnection().dispatchSync(md);
> -                onDispatch(node, message);
> -            }
> -            return true;
> +                public void run() {
> +                    // Since the message gets queued up in async dispatch,
> +                    // we don't want to
> +                    // decrease the reference count until it gets put on
> the
> +                    // wire.
> +                    onDispatch(node, message);
> +                }
> +            });
> +            context.getConnection().dispatchAsync(md);
>          } else {
> -            return false;
> +            context.getConnection().dispatchSync(md);
> +            onDispatch(node, message);
>          }
> +        return true;
>      }
>
>      protected void onDispatch(final MessageReference node, final Message
> message) {
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
> Mon Nov 26 21:13:25 2012
> @@ -471,13 +471,13 @@ public class Queue extends BaseDestinati
>                  browserDispatches.add(browserDispatch);
>              }
>
> -            if (!(this.optimizedDispatch || isSlave())) {
> +            if (!this.optimizedDispatch) {
>                  wakeup();
>              }
>          }finally {
>              pagedInPendingDispatchLock.writeLock().unlock();
>          }
> -        if (this.optimizedDispatch || isSlave()) {
> +        if (this.optimizedDispatch) {
>              // Outside of dispatchLock() to maintain the lock hierarchy of
>              // iteratingMutex -> dispatchLock. - see
>              // https://issues.apache.org/activemq/browse/AMQ-1878
> @@ -578,13 +578,13 @@ public class Queue extends BaseDestinati
>              }finally {
>                  consumersLock.writeLock().unlock();
>              }
> -            if (!(this.optimizedDispatch || isSlave())) {
> +            if (!this.optimizedDispatch) {
>                  wakeup();
>              }
>          }finally {
>              pagedInPendingDispatchLock.writeLock().unlock();
>          }
> -        if (this.optimizedDispatch || isSlave()) {
> +        if (this.optimizedDispatch) {
>              // Outside of dispatchLock() to maintain the lock hierarchy of
>              // iteratingMutex -> dispatchLock. - see
>              // https://issues.apache.org/activemq/browse/AMQ-1878
> @@ -1704,7 +1704,7 @@ public class Queue extends BaseDestinati
>      }
>
>      public void wakeup() {
> -        if ((optimizedDispatch || isSlave()) && !iterationRunning) {
> +        if (optimizedDispatch && !iterationRunning) {
>              iterate();
>              pendingWakeups.incrementAndGet();
>          } else {
> @@ -1721,10 +1721,6 @@ public class Queue extends BaseDestinati
>          }
>      }
>
> -    private boolean isSlave() {
> -        return broker.getBrokerService().isSlave();
> -    }
> -
>      private void doPageIn(boolean force) throws Exception {
>          PendingList newlyPaged = doPageInForDispatch(force);
>          pagedInPendingDispatchLock.writeLock().lock();
> @@ -1875,7 +1871,7 @@ public class Queue extends BaseDestinati
>          consumersLock.writeLock().lock();
>
>          try {
> -            if (this.consumers.isEmpty() || isSlave()) {
> +            if (this.consumers.isEmpty()) {
>                  // slave dispatch happens in processDispatchNotification
>                  return list;
>              }
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
> Mon Nov 26 21:13:25 2012
> @@ -695,10 +695,6 @@ public class RegionBroker extends EmptyB
>          }
>      }
>
> -    public boolean isSlaveBroker() {
> -        return brokerService.isSlave();
> -    }
> -
>      @Override
>      public boolean isStopped() {
>          return !started;
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
> Mon Nov 26 21:13:25 2012
> @@ -109,12 +109,7 @@ public interface Subscription extends Su
>       * @throws Exception
>       */
>      void processMessageDispatchNotification(MessageDispatchNotification
>  mdn) throws Exception;
> -
> -    /**
> -     * @return true if the broker is currently in slave mode
> -     */
> -    boolean isSlave();
> -
> +
>      /**
>       * @return number of messages pending delivery
>       */
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
> Mon Nov 26 21:13:25 2012
> @@ -101,7 +101,7 @@ public class TopicSubscription extends A
>              return;
>          }
>          enqueueCounter.incrementAndGet();
> -        if (!isFull() && matched.isEmpty() && !isSlave()) {
> +        if (!isFull() && matched.isEmpty()) {
>              // if maximumPendingMessages is set we will only discard
> messages which
>              // have not been dispatched (i.e. we allow the prefetch
> buffer to be filled)
>              dispatch(node);
> @@ -299,7 +299,7 @@ public class TopicSubscription extends A
>      public Response pullMessage(ConnectionContext context, MessagePull
> pull) throws Exception {
>
>          // The slave should not deliver pull messages.
> -        if (getPrefetchSize() == 0 && !isSlave()) {
> +        if (getPrefetchSize() == 0 ) {
>
>              prefetchWindowOpen.set(true);
>              dispatchMatched();
>
> Modified:
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
> (original)
> +++
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
> Mon Nov 26 21:13:25 2012
> @@ -338,7 +338,6 @@ public class MBeanTest extends EmbeddedB
>          ObjectName brokerName = assertRegisteredObjectName(domain +
> ":Type=Broker,BrokerName=localhost");
>          BrokerViewMBean broker =
> (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
> brokerName, BrokerViewMBean.class, true);
>
> -        assertTrue("broker is not a slave", !broker.isSlave());
>          // create 2 topics
>          broker.addTopic(getDestinationString() + "1 ");
>          broker.addTopic(" " + getDestinationString() + "2");
> @@ -536,7 +535,6 @@ public class MBeanTest extends EmbeddedB
>          ObjectName brokerName = assertRegisteredObjectName(domain +
> ":Type=Broker,BrokerName=localhost");
>          BrokerViewMBean broker =
> (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
> brokerName, BrokerViewMBean.class, true);
>
> -        assertTrue("broker is not a slave", !broker.isSlave());
>          // create 2 topics
>          broker.addTopic(getDestinationString() + "1");
>          broker.addTopic(getDestinationString() + "2");
> @@ -588,7 +586,6 @@ public class MBeanTest extends EmbeddedB
>          ObjectName brokerName = assertRegisteredObjectName(domain +
> ":Type=Broker,BrokerName=localhost");
>          BrokerViewMBean broker =
> (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
> brokerName, BrokerViewMBean.class, true);
>
> -        assertTrue("broker is not a slave", !broker.isSlave());
>          // create 2 topics
>          broker.addTopic(getDestinationString() + "1");
>          broker.addTopic(getDestinationString() + "2");
> @@ -797,7 +794,6 @@ public class MBeanTest extends EmbeddedB
>          ObjectName brokerName = assertRegisteredObjectName(domain +
> ":Type=Broker,BrokerName=localhost");
>          BrokerViewMBean broker =
> (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
> brokerName, BrokerViewMBean.class, true);
>
> -        assertTrue("broker is not a slave", !broker.isSlave());
>          assertEquals(0, broker.getDynamicDestinationProducers().length);
>
>          Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
> Modified:
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
> (original)
> +++
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
> Mon Nov 26 21:13:25 2012
> @@ -234,10 +234,6 @@ public class QueueDuplicatesFromStoreTes
>                  return false;
>              }
>
> -            public boolean isSlave() {
> -                return false;
> -            }
> -
>              public boolean matches(MessageReference node,
>                      MessageEvaluationContext context) throws IOException {
>                  return true;
>
> Modified:
> activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
> (original)
> +++
> activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
> Mon Nov 26 21:13:25 2012
> @@ -66,7 +66,6 @@ public class ApplicationContextFilter im
>      private String applicationContextName = "applicationContext";
>      private String requestContextName = "requestContext";
>      private String requestName = "request";
> -    private final String slavePage = "slave.jsp";
>
>      public void init(FilterConfig config) throws ServletException {
>          this.servletContext = config.getServletContext();
> @@ -85,19 +84,19 @@ public class ApplicationContextFilter im
>          Map requestContextWrapper = createRequestContextWrapper(request);
>          String path = ((HttpServletRequest)request).getRequestURI();
>          // handle slave brokers
> -        try {
> -            if ( !(path.endsWith("css") || path.endsWith("png") ||
> path.endsWith("ico") || path.endsWith(slavePage))
> -                    &&
> ((BrokerFacade)requestContextWrapper.get("brokerQuery")).isSlave()) {
> -                ((HttpServletResponse)response).sendRedirect(slavePage);
> -                return;
> -            }
> -        } catch (Exception e) {
> -            LOG.warn(path + ", failed to access BrokerFacade: reason: " +
> e.getLocalizedMessage());
> -            if (LOG.isDebugEnabled()) {
> -                LOG.debug(request.toString(), e);
> -            }
> -            throw new IOException(e);
> -        }
> +//        try {
> +//            if ( !(path.endsWith("css") || path.endsWith("png") ||
> path.endsWith("ico") || path.endsWith(slavePage))
> +//                    &&
> ((BrokerFacade)requestContextWrapper.get("brokerQuery")).isSlave()) {
> +//                ((HttpServletResponse)response).sendRedirect(slavePage);
> +//                return;
> +//            }
> +//        } catch (Exception e) {
> +//            LOG.warn(path + ", failed to access BrokerFacade: reason: "
> + e.getLocalizedMessage());
> +//            if (LOG.isDebugEnabled()) {
> +//                LOG.debug(request.toString(), e);
> +//            }
> +//            throw new IOException(e);
> +//        }
>          request.setAttribute(requestContextName, requestContextWrapper);
>          request.setAttribute(requestName, request);
>          chain.doFilter(request, response);
>
> Modified:
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
> (original)
> +++
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
> Mon Nov 26 21:13:25 2012
> @@ -209,6 +209,4 @@ public interface BrokerFacade {
>
>      boolean isJobSchedulerStarted();
>
> -    boolean isSlave() throws Exception;
> -
>  }
> \ No newline at end of file
>
> Modified:
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
> (original)
> +++
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
> Mon Nov 26 21:13:25 2012
> @@ -226,8 +226,4 @@ public abstract class BrokerFacadeSuppor
>              return false;
>          }
>      }
> -
> -    public boolean isSlave() throws Exception {
> -        return getBrokerAdmin().isSlave();
> -    }
>  }
>
>
>


-- 
http://redhat.com
http://blog.garytully.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message