activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1413846 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/test/j...
Date Mon, 26 Nov 2012 21:13:30 GMT
Author: chirino
Date: Mon Nov 26 21:13:25 2012
New Revision: 1413846

URL: http://svn.apache.org/viewvc?rev=1413846&view=rev
Log:
Changes for https://issues.apache.org/jira/browse/AMQ-4165 : Remove pure master/slave functionality

Removed:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/MasterSlaveDiscoveryTest.java
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
    activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
Mon Nov 26 21:13:25 2012
@@ -168,7 +168,6 @@ public class BrokerService implements Se
     private File schedulerDirectoryFile;
     private Scheduler scheduler;
     private ThreadPoolExecutor executor;
-    private boolean slave = true;
     private int schedulePeriodForDestinationPurge= 0;
     private int maxPurgedDestinationsPerSweep = 0;
     private BrokerContext brokerContext;
@@ -392,13 +391,6 @@ public class BrokerService implements Se
         return null;
     }
 
-    /**
-     * @return true if this Broker is a slave to a Master
-     */
-    public boolean isSlave() {
-        return slave;
-    }
-
     public void masterFailed() {
         if (shutdownOnMasterFailure) {
             LOG.error("The Master has failed ... shutting down");
@@ -578,7 +570,6 @@ public class BrokerService implements Se
         if (startException != null) {
             return;
         }
-        slave = false;
         startDestinations();
         addShutdownHook();
 
@@ -604,9 +595,7 @@ public class BrokerService implements Se
             adminView.setBroker(managedBroker);
         }
 
-        if (!isSlave()) {
-            startAllConnectors();
-        }
+        startAllConnectors();
 
         if (ioExceptionHandler == null) {
             setIoExceptionHandler(new DefaultIOExceptionHandler());
@@ -680,7 +669,6 @@ public class BrokerService implements Se
         try {
             stopper.stop(persistenceAdapter);
             persistenceAdapter = null;
-            slave = true;
             if (isUseJmx()) {
                 stopper.stop(getManagementContext());
                 managementContext = null;
@@ -1227,8 +1215,7 @@ public class BrokerService implements Se
     }
 
     /**
-     * Sets the services associated with this broker such as a
-     * {@link MasterConnector}
+     * Sets the services associated with this broker.
      */
     public void setServices(Service[] services) {
         this.services.clear();
@@ -2246,82 +2233,80 @@ public class BrokerService implements Se
      * @throws Exception
      */
     public void startAllConnectors() throws Exception {
-        if (!isSlave()) {
-            Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
-            List<TransportConnector> al = new ArrayList<TransportConnector>();
-            for (Iterator<TransportConnector> iter = getTransportConnectors().iterator();
iter.hasNext();) {
-                TransportConnector connector = iter.next();
-                connector.setBrokerService(this);
-                al.add(startTransportConnector(connector));
-            }
-            if (al.size() > 0) {
-                // let's clear the transportConnectors list and replace it with
-                // the started transportConnector instances
-                this.transportConnectors.clear();
-                setTransportConnectors(al);
-            }
-            URI uri = getVmConnectorURI();
-            Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
-            map.put("network", "true");
-            map.put("async", "false");
-            uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
-
-            if (!stopped.get()) {
-                ThreadPoolExecutor networkConnectorStartExecutor = null;
-                if (isNetworkConnectorStartAsync()) {
-                    // spin up as many threads as needed
-                    networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
-                            10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
-                            new ThreadFactory() {
-                                int count=0;
-                                public Thread newThread(Runnable runnable) {
-                                    Thread thread = new Thread(runnable, "NetworkConnector
Start Thread-" +(count++));
-                                    thread.setDaemon(true);
-                                    return thread;
-                                }
-                            });
-                }
+        Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
+        List<TransportConnector> al = new ArrayList<TransportConnector>();
+        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator();
iter.hasNext();) {
+            TransportConnector connector = iter.next();
+            connector.setBrokerService(this);
+            al.add(startTransportConnector(connector));
+        }
+        if (al.size() > 0) {
+            // let's clear the transportConnectors list and replace it with
+            // the started transportConnector instances
+            this.transportConnectors.clear();
+            setTransportConnectors(al);
+        }
+        URI uri = getVmConnectorURI();
+        Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
+        map.put("network", "true");
+        map.put("async", "false");
+        uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
 
-                for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator();
iter.hasNext();) {
-                    final NetworkConnector connector = iter.next();
-                    connector.setLocalUri(uri);
-                    connector.setBrokerName(getBrokerName());
-                    connector.setDurableDestinations(durableDestinations);
-                    if (getDefaultSocketURIString() != null) {
-                        connector.setBrokerURL(getDefaultSocketURIString());
-                    }
-                    if (networkConnectorStartExecutor != null) {
-                        networkConnectorStartExecutor.execute(new Runnable() {
-                            public void run() {
-                                try {
-                                    LOG.info("Async start of " + connector);
-                                    connector.start();
-                                } catch(Exception e) {
-                                    LOG.error("Async start of network connector: " + connector
+ " failed", e);
-                                }
+        if (!stopped.get()) {
+            ThreadPoolExecutor networkConnectorStartExecutor = null;
+            if (isNetworkConnectorStartAsync()) {
+                // spin up as many threads as needed
+                networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+                        10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+                        new ThreadFactory() {
+                            int count=0;
+                            public Thread newThread(Runnable runnable) {
+                                Thread thread = new Thread(runnable, "NetworkConnector Start
Thread-" +(count++));
+                                thread.setDaemon(true);
+                                return thread;
                             }
                         });
-                    } else {
-                        connector.start();
-                    }
-                }
-                if (networkConnectorStartExecutor != null) {
-                    // executor done when enqueued tasks are complete
-                    ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
-                }
+            }
 
-                for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator();
iter.hasNext();) {
-                    ProxyConnector connector = iter.next();
-                    connector.start();
+            for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator();
iter.hasNext();) {
+                final NetworkConnector connector = iter.next();
+                connector.setLocalUri(uri);
+                connector.setBrokerName(getBrokerName());
+                connector.setDurableDestinations(durableDestinations);
+                if (getDefaultSocketURIString() != null) {
+                    connector.setBrokerURL(getDefaultSocketURIString());
                 }
-                for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();)
{
-                    JmsConnector connector = iter.next();
+                if (networkConnectorStartExecutor != null) {
+                    networkConnectorStartExecutor.execute(new Runnable() {
+                        public void run() {
+                            try {
+                                LOG.info("Async start of " + connector);
+                                connector.start();
+                            } catch(Exception e) {
+                                LOG.error("Async start of network connector: " + connector
+ " failed", e);
+                            }
+                        }
+                    });
+                } else {
                     connector.start();
                 }
-                for (Service service : services) {
-                    configureService(service);
-                    service.start();
-                }
+            }
+            if (networkConnectorStartExecutor != null) {
+                // executor done when enqueued tasks are complete
+                ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
+            }
+
+            for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();)
{
+                ProxyConnector connector = iter.next();
+                connector.start();
+            }
+            for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();)
{
+                JmsConnector connector = iter.next();
+                connector.start();
+            }
+            for (Service service : services) {
+                configureService(service);
+                service.start();
             }
         }
     }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
Mon Nov 26 21:13:25 2012
@@ -296,13 +296,6 @@ public class ConnectionContext {
     }
 
     /**
-     * @return the slave
-     */
-    public boolean isSlave() {
-        return (this.broker != null && this.broker.getBrokerService().isSlave())
|| !this.clientMaster;
-    }
-
-    /**
      * @return the clientMaster
      */
     public boolean isClientMaster() {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
Mon Nov 26 21:13:25 2012
@@ -186,10 +186,6 @@ public class BrokerView implements Broke
         return brokerService.isPersistent();
     }
 
-    public boolean isSlave() {
-        return brokerService.isSlave();
-    }
-
     public void terminateJVM(int exitCode) {
         System.exit(exitCode);
     }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
Mon Nov 26 21:13:25 2012
@@ -115,9 +115,6 @@ public interface BrokerViewMBean extends
     @MBeanInfo("Messages are synchronized to disk.")
     boolean isPersistent();
 
-    @MBeanInfo("Slave broker.")
-    boolean isSlave();
-
     /**
      * Shuts down the JVM.
      *

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
Mon Nov 26 21:13:25 2012
@@ -116,10 +116,6 @@ public abstract class AbstractSubscripti
     public void gc() {
     }
 
-    public boolean isSlave() {
-        return broker.getBrokerService().isSlave();
-    }
-
     public ConnectionContext getContext() {
         return context;
     }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Nov 26 21:13:25 2012
@@ -92,7 +92,7 @@ public abstract class PrefetchSubscripti
         // The slave should not deliver pull messages.
         // TODO: when the slave becomes a master, He should send a NULL message to all the
         // consumers to 'wake them up' in case they were waiting for a message.
-        if (getPrefetchSize() == 0 && !isSlave()) {
+        if (getPrefetchSize() == 0) {
 
             prefetchExtension.incrementAndGet();
             final long dispatchCounterBeforePull = dispatchCounter;
@@ -194,13 +194,12 @@ public abstract class PrefetchSubscripti
         boolean callDispatchMatched = false;
         Destination destination = null;
 
-        if (!isSlave()) {
-            if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
-                // suppress unexpected ack exception in this expected case
-                LOG.warn("Ignoring ack received before dispatch; result of failover with
an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack:
" + ack);
-                return;
-            }
+        if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
+            // suppress unexpected ack exception in this expected case
+            LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding
ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack);
+            return;
         }
+
         if (LOG.isTraceEnabled()) {
             LOG.trace("ack:" + ack);
         }
@@ -413,15 +412,8 @@ public abstract class PrefetchSubscripti
             destination.wakeup();
             dispatchPending();
         } else {
-            if (isSlave()) {
-                throw new JMSException(
-                        "Slave broker out of sync with master: Acknowledgment ("
-                                + ack + ") was not in the dispatch list: "
-                                + dispatched);
-            } else {
-                LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection
reconnects): "
-                        + ack);
-            }
+            LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection
reconnects): "
+                    + ack);
         }
     }
 
@@ -447,11 +439,7 @@ public abstract class PrefetchSubscripti
                     @Override
                     public void afterRollback() throws Exception {
                         synchronized(dispatchLock) {
-                            if (isSlave()) {
-                                ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
-                            } else {
-                                // poisionAck will decrement - otherwise still inflight on
client
-                            }
+                            // poisionAck will decrement - otherwise still inflight on client
                         }
                     }
                 });
@@ -617,53 +605,51 @@ public abstract class PrefetchSubscripti
     }
 
     protected void dispatchPending() throws IOException {
-        if (!isSlave()) {
-           synchronized(pendingLock) {
-                try {
-                    int numberToDispatch = countBeforeFull();
-                    if (numberToDispatch > 0) {
-                        setSlowConsumer(false);
-                        setPendingBatchSize(pending, numberToDispatch);
-                        int count = 0;
-                        pending.reset();
-                        while (pending.hasNext() && !isFull()
-                                && count < numberToDispatch) {
-                            MessageReference node = pending.next();
-                            if (node == null) {
-                                break;
-                            }
+       synchronized(pendingLock) {
+            try {
+                int numberToDispatch = countBeforeFull();
+                if (numberToDispatch > 0) {
+                    setSlowConsumer(false);
+                    setPendingBatchSize(pending, numberToDispatch);
+                    int count = 0;
+                    pending.reset();
+                    while (pending.hasNext() && !isFull()
+                            && count < numberToDispatch) {
+                        MessageReference node = pending.next();
+                        if (node == null) {
+                            break;
+                        }
+
+                        // Synchronize between dispatched list and remove of message from
pending list
+                        // related to remove subscription action
+                        synchronized(dispatchLock) {
+                            pending.remove();
+                            node.decrementReferenceCount();
+                            if( !isDropped(node) && canDispatch(node)) {
 
-                            // Synchronize between dispatched list and remove of message
from pending list
-                            // related to remove subscription action
-                            synchronized(dispatchLock) {
-                                pending.remove();
-                                node.decrementReferenceCount();
-                                if( !isDropped(node) && canDispatch(node)) {
-
-                                    // Message may have been sitting in the pending
-                                    // list a while waiting for the consumer to ak the message.
-                                    if (node!=QueueMessageReference.NULL_MESSAGE &&
node.isExpired()) {
-                                        //increment number to dispatch
-                                        numberToDispatch++;
-                                        if (broker.isExpired(node)) {
-                                            ((Destination)node.getRegionDestination()).messageExpired(context,
this, node);
-                                        }
-                                        continue;
+                                // Message may have been sitting in the pending
+                                // list a while waiting for the consumer to ak the message.
+                                if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired())
{
+                                    //increment number to dispatch
+                                    numberToDispatch++;
+                                    if (broker.isExpired(node)) {
+                                        ((Destination)node.getRegionDestination()).messageExpired(context,
this, node);
                                     }
-                                    dispatch(node);
-                                    count++;
+                                    continue;
                                 }
+                                dispatch(node);
+                                count++;
                             }
                         }
-                    } else if (!isSlowConsumer()) {
-                        setSlowConsumer(true);
-                        for (Destination dest :destinations) {
-                            dest.slowConsumer(context, this);
-                        }
                     }
-                } finally {
-                    pending.release();
+                } else if (!isSlowConsumer()) {
+                    setSlowConsumer(true);
+                    for (Destination dest :destinations) {
+                        dest.slowConsumer(context, this);
+                    }
                 }
+            } finally {
+                pending.release();
             }
         }
     }
@@ -682,42 +668,37 @@ public abstract class PrefetchSubscripti
         okForAckAsDispatchDone.countDown();
 
         // No reentrant lock - Patch needed to IndirectMessageReference on method lock
-        if (!isSlave()) {
-
-            MessageDispatch md = createMessageDispatch(node, message);
-            // NULL messages don't count... they don't get Acked.
-            if (node != QueueMessageReference.NULL_MESSAGE) {
-                dispatchCounter++;
-                dispatched.add(node);
-            } else {
-                while (true) {
-                    int currentExtension = prefetchExtension.get();
-                    int newExtension = Math.max(0, currentExtension - 1);
-                    if (prefetchExtension.compareAndSet(currentExtension, newExtension))
{
-                        break;
-                    }
+        MessageDispatch md = createMessageDispatch(node, message);
+        // NULL messages don't count... they don't get Acked.
+        if (node != QueueMessageReference.NULL_MESSAGE) {
+            dispatchCounter++;
+            dispatched.add(node);
+        } else {
+            while (true) {
+                int currentExtension = prefetchExtension.get();
+                int newExtension = Math.max(0, currentExtension - 1);
+                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
+                    break;
                 }
             }
-            if (info.isDispatchAsync()) {
-                md.setTransmitCallback(new Runnable() {
+        }
+        if (info.isDispatchAsync()) {
+            md.setTransmitCallback(new Runnable() {
 
-                    public void run() {
-                        // Since the message gets queued up in async dispatch,
-                        // we don't want to
-                        // decrease the reference count until it gets put on the
-                        // wire.
-                        onDispatch(node, message);
-                    }
-                });
-                context.getConnection().dispatchAsync(md);
-            } else {
-                context.getConnection().dispatchSync(md);
-                onDispatch(node, message);
-            }
-            return true;
+                public void run() {
+                    // Since the message gets queued up in async dispatch,
+                    // we don't want to
+                    // decrease the reference count until it gets put on the
+                    // wire.
+                    onDispatch(node, message);
+                }
+            });
+            context.getConnection().dispatchAsync(md);
         } else {
-            return false;
+            context.getConnection().dispatchSync(md);
+            onDispatch(node, message);
         }
+        return true;
     }
 
     protected void onDispatch(final MessageReference node, final Message message) {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Nov 26 21:13:25 2012
@@ -471,13 +471,13 @@ public class Queue extends BaseDestinati
                 browserDispatches.add(browserDispatch);
             }
 
-            if (!(this.optimizedDispatch || isSlave())) {
+            if (!this.optimizedDispatch) {
                 wakeup();
             }
         }finally {
             pagedInPendingDispatchLock.writeLock().unlock();
         }
-        if (this.optimizedDispatch || isSlave()) {
+        if (this.optimizedDispatch) {
             // Outside of dispatchLock() to maintain the lock hierarchy of
             // iteratingMutex -> dispatchLock. - see
             // https://issues.apache.org/activemq/browse/AMQ-1878
@@ -578,13 +578,13 @@ public class Queue extends BaseDestinati
             }finally {
                 consumersLock.writeLock().unlock();
             }
-            if (!(this.optimizedDispatch || isSlave())) {
+            if (!this.optimizedDispatch) {
                 wakeup();
             }
         }finally {
             pagedInPendingDispatchLock.writeLock().unlock();
         }
-        if (this.optimizedDispatch || isSlave()) {
+        if (this.optimizedDispatch) {
             // Outside of dispatchLock() to maintain the lock hierarchy of
             // iteratingMutex -> dispatchLock. - see
             // https://issues.apache.org/activemq/browse/AMQ-1878
@@ -1704,7 +1704,7 @@ public class Queue extends BaseDestinati
     }
 
     public void wakeup() {
-        if ((optimizedDispatch || isSlave()) && !iterationRunning) {
+        if (optimizedDispatch && !iterationRunning) {
             iterate();
             pendingWakeups.incrementAndGet();
         } else {
@@ -1721,10 +1721,6 @@ public class Queue extends BaseDestinati
         }
     }
 
-    private boolean isSlave() {
-        return broker.getBrokerService().isSlave();
-    }
-
     private void doPageIn(boolean force) throws Exception {
         PendingList newlyPaged = doPageInForDispatch(force);
         pagedInPendingDispatchLock.writeLock().lock();
@@ -1875,7 +1871,7 @@ public class Queue extends BaseDestinati
         consumersLock.writeLock().lock();
 
         try {
-            if (this.consumers.isEmpty() || isSlave()) {
+            if (this.consumers.isEmpty()) {
                 // slave dispatch happens in processDispatchNotification
                 return list;
             }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Mon Nov 26 21:13:25 2012
@@ -695,10 +695,6 @@ public class RegionBroker extends EmptyB
         }
     }
 
-    public boolean isSlaveBroker() {
-        return brokerService.isSlave();
-    }
-
     @Override
     public boolean isStopped() {
         return !started;

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
Mon Nov 26 21:13:25 2012
@@ -109,12 +109,7 @@ public interface Subscription extends Su
      * @throws Exception 
      */
     void processMessageDispatchNotification(MessageDispatchNotification  mdn) throws Exception;
-    
-    /**
-     * @return true if the broker is currently in slave mode
-     */
-    boolean isSlave();
-    
+
     /**
      * @return number of messages pending delivery
      */

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Mon Nov 26 21:13:25 2012
@@ -101,7 +101,7 @@ public class TopicSubscription extends A
             return;
         }
         enqueueCounter.incrementAndGet();
-        if (!isFull() && matched.isEmpty() && !isSlave()) {
+        if (!isFull() && matched.isEmpty()) {
             // if maximumPendingMessages is set we will only discard messages which
             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
             dispatch(node);
@@ -299,7 +299,7 @@ public class TopicSubscription extends A
     public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception
{
 
         // The slave should not deliver pull messages.
-        if (getPrefetchSize() == 0 && !isSlave()) {
+        if (getPrefetchSize() == 0 ) {
 
             prefetchWindowOpen.set(true);
             dispatchMatched();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Mon Nov 26 21:13:25 2012
@@ -338,7 +338,6 @@ public class MBeanTest extends EmbeddedB
         ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
         BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
 
-        assertTrue("broker is not a slave", !broker.isSlave());
         // create 2 topics
         broker.addTopic(getDestinationString() + "1 ");
         broker.addTopic(" " + getDestinationString() + "2");
@@ -536,7 +535,6 @@ public class MBeanTest extends EmbeddedB
         ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
         BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
 
-        assertTrue("broker is not a slave", !broker.isSlave());
         // create 2 topics
         broker.addTopic(getDestinationString() + "1");
         broker.addTopic(getDestinationString() + "2");
@@ -588,7 +586,6 @@ public class MBeanTest extends EmbeddedB
         ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
         BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
 
-        assertTrue("broker is not a slave", !broker.isSlave());
         // create 2 topics
         broker.addTopic(getDestinationString() + "1");
         broker.addTopic(getDestinationString() + "2");
@@ -797,7 +794,6 @@ public class MBeanTest extends EmbeddedB
         ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
         BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
 
-        assertTrue("broker is not a slave", !broker.isSlave());
         assertEquals(0, broker.getDynamicDestinationProducers().length);
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
Mon Nov 26 21:13:25 2012
@@ -234,10 +234,6 @@ public class QueueDuplicatesFromStoreTes
                 return false;
             }
 
-            public boolean isSlave() {
-                return false;
-            }
-
             public boolean matches(MessageReference node,
                     MessageEvaluationContext context) throws IOException {
                 return true;

Modified: activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
(original)
+++ activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
Mon Nov 26 21:13:25 2012
@@ -66,7 +66,6 @@ public class ApplicationContextFilter im
     private String applicationContextName = "applicationContext";
     private String requestContextName = "requestContext";
     private String requestName = "request";
-    private final String slavePage = "slave.jsp";
 
     public void init(FilterConfig config) throws ServletException {
         this.servletContext = config.getServletContext();
@@ -85,19 +84,19 @@ public class ApplicationContextFilter im
         Map requestContextWrapper = createRequestContextWrapper(request);
         String path = ((HttpServletRequest)request).getRequestURI();
         // handle slave brokers
-        try {
-            if ( !(path.endsWith("css") || path.endsWith("png") || path.endsWith("ico") ||
path.endsWith(slavePage))
-                    && ((BrokerFacade)requestContextWrapper.get("brokerQuery")).isSlave())
{
-                ((HttpServletResponse)response).sendRedirect(slavePage);
-                return;
-            }
-        } catch (Exception e) {
-            LOG.warn(path + ", failed to access BrokerFacade: reason: " + e.getLocalizedMessage());
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(request.toString(), e);
-            }
-            throw new IOException(e);
-        }
+//        try {
+//            if ( !(path.endsWith("css") || path.endsWith("png") || path.endsWith("ico")
|| path.endsWith(slavePage))
+//                    && ((BrokerFacade)requestContextWrapper.get("brokerQuery")).isSlave())
{
+//                ((HttpServletResponse)response).sendRedirect(slavePage);
+//                return;
+//            }
+//        } catch (Exception e) {
+//            LOG.warn(path + ", failed to access BrokerFacade: reason: " + e.getLocalizedMessage());
+//            if (LOG.isDebugEnabled()) {
+//                LOG.debug(request.toString(), e);
+//            }
+//            throw new IOException(e);
+//        }
         request.setAttribute(requestContextName, requestContextWrapper);
         request.setAttribute(requestName, request);
         chain.doFilter(request, response);

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java Mon
Nov 26 21:13:25 2012
@@ -209,6 +209,4 @@ public interface BrokerFacade {
 
     boolean isJobSchedulerStarted();
 
-    boolean isSlave() throws Exception;
-
 }
\ No newline at end of file

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
(original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
Mon Nov 26 21:13:25 2012
@@ -226,8 +226,4 @@ public abstract class BrokerFacadeSuppor
             return false;
         }
     }
-
-    public boolean isSlave() throws Exception {
-        return getBrokerAdmin().isSlave();
-    }
 }



Mime
View raw message