activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r557748 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/ broker/region/ command/
Date Thu, 19 Jul 2007 19:24:33 GMT
Author: rajdavies
Date: Thu Jul 19 12:24:31 2007
New Revision: 557748

URL: http://svn.apache.org/viewvc?view=rev&rev=557748
Log:
move decision about being a slave from the Broker to the ConnectionContext - so can be done
on a Connection basis if required

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Thu
Jul 19 12:24:31 2007
@@ -191,12 +191,7 @@
      * @param messageDispatch
      */
     public void processDispatch(MessageDispatch messageDispatch);
-    
-    /**
-     * @return true if the broker is running as a slave
-     */
-    public boolean isSlaveBroker();
-    
+  
     /**
      * @return true if the broker has stopped
      */
@@ -229,7 +224,7 @@
      * @return true if fault tolerant
      */
     public boolean isFaultTolerantConfiguration();
-
+    
     /**
      * @return the connection context used to make administration operations on startup or
via JMX MBeans
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Thu Jul 19 12:24:31 2007
@@ -200,11 +200,7 @@
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification)
throws Exception{
         next.processDispatchNotification(messageDispatchNotification);
     }
-    
-    public boolean isSlaveBroker(){
-        return next.isSlaveBroker();
-    }
-    
+        
     public boolean isStopped(){
         return next.isStopped();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Thu Jul 19 12:24:31 2007
@@ -154,6 +154,7 @@
     private boolean useLocalHostBrokerName = false;
     private CountDownLatch stoppedLatch = new CountDownLatch(1);
     private boolean supportFailOver = false;
+    private boolean clustered = false;
 
     static{
         String localHostName = "localhost";
@@ -1120,6 +1121,20 @@
     public void setSupportFailOver(boolean supportFailOver){
         this.supportFailOver=supportFailOver;
     }    
+    
+    /**
+     * @return the clustered
+     */
+    public boolean isClustered(){
+        return this.clustered;
+    }
+
+    /**
+     * @param clustered the clustered to set
+     */
+    public void setClustered(boolean clustered){
+        this.clustered=clustered;
+    }
 
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -1697,6 +1712,5 @@
                 broker.addDestination(adminConnectionContext, destination);
             }
         }
-    }
-   
+    }   
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
Thu Jul 19 12:24:31 2007
@@ -59,6 +59,7 @@
     private final AtomicBoolean stopping = new AtomicBoolean();
     private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
 	private boolean dontSendReponse;
+    private boolean clientMaster=true;
     
     public ConnectionContext() {
     }
@@ -267,6 +268,29 @@
 
 	public boolean isDontSendReponse() {
 		return dontSendReponse;
-	}	
-	
+	}
+
+    
+    /**
+     * @return the slave
+     */
+    public boolean isSlave(){
+        return (this.broker!=null&&this.broker.getBrokerService().isSlave())||!this.clientMaster;
+    }
+
+    
+    /**
+     * @return the clientMaster
+     */
+    public boolean isClientMaster(){
+        return this.clientMaster;
+    }
+
+    
+    /**
+     * @param clientMaster the clientMaster to set
+     */
+    public void setClientMaster(boolean clientMaster){
+        this.clientMaster=clientMaster;
+    }	
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Thu Jul 19 12:24:31 2007
@@ -199,10 +199,7 @@
 
     }
 
-    public boolean isSlaveBroker() {
-        return false;
-    }
-
+    
     public boolean isStopped() {
         return false;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Thu Jul 19 12:24:31 2007
@@ -197,10 +197,7 @@
         throw new BrokerStoppedException(this.message);
     }
 
-    public boolean isSlaveBroker() {
-        throw new BrokerStoppedException(this.message);
-    }
-
+   
     public boolean isStopped() {
         return true;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Thu Jul 19 12:24:31 2007
@@ -209,10 +209,7 @@
         getNext().processDispatchNotification(messageDispatchNotification);
     }
     
-    public boolean isSlaveBroker(){
-        return getNext().isSlaveBroker();
-    }
-    
+       
     public boolean isStopped(){
         return getNext().isStopped();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Thu Jul 19 12:24:31 2007
@@ -658,6 +658,7 @@
         context.setClientId(clientId);
         context.setUserName(info.getUserName());
         context.setConnectionId(info.getConnectionId());
+        context.setClientMaster(info.isClientMaster());
         context.setWireFormatInfo(wireFormatInfo);
         context.setNetworkConnection(networkConnection);
         context.incrementReference();
@@ -1199,18 +1200,19 @@
         }
     }
     
-	protected void disposeTransport() {
-    	if( transportDisposed.compareAndSet(false, true) ) {
-        try {
-			transport.stop();
-			active = false;
-			log.debug("Stopped connection: "+transport.getRemoteAddress());
-		} catch (Exception e) {
-			log.debug("Could not stop transport: "+e,e);
-		}
-    	}
-	}
-	
+	protected void disposeTransport(){
+        if(transportDisposed.compareAndSet(false,true)){
+            try{
+                transport.stop();
+                active=false;
+                log.debug("Stopped connection: "+transport.getRemoteAddress());
+            }catch(Exception e){
+                log.debug("Could not stop transport: "+e,e);
+            }
+        }
+    }
+    
+   	
 	public int getProtocolVersion() {
 		return protocolVersion.get();
 	}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
Thu Jul 19 12:24:31 2007
@@ -115,8 +115,8 @@
     public void gc() {        
     }
     
-    public boolean isSlaveBroker(){
-        return broker.isSlaveBroker();
+    public boolean isSlave(){
+        return getContext().isSlave();
     }
 
     public ConnectionContext getContext() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Thu Jul 19 12:24:31 2007
@@ -74,7 +74,7 @@
         // The slave should not deliver pull messages. TODO: when the slave becomes a master,
         // He should send a NULL message to all the consumers to 'wake them up' in case
         // they were waiting for a message.
-        if(getPrefetchSize()==0&&!isSlaveBroker()){
+        if(getPrefetchSize()==0&&!isSlave()){
             prefetchExtension++;
             final long dispatchCounterBeforePull=dispatchCounter;
             dispatchMatched();
@@ -119,7 +119,7 @@
         pendingEmpty=pending.isEmpty();
         enqueueCounter++;
        
-        if(!isFull()&&pendingEmpty&&!broker.isSlaveBroker()){
+        if(!isFull()&&pendingEmpty&&!isSlave()){
             dispatch(node);
         }else{
             optimizePrefetch();
@@ -260,7 +260,7 @@
         if(callDispatchMatched){
             dispatchMatched();
         }else{
-            if(isSlaveBroker()){
+            if(isSlave()){
                 throw new JMSException("Slave broker out of sync with master: Acknowledgment
("+ack
                         +") was not in the dispatch list: "+dispatched);
             }else{
@@ -295,7 +295,7 @@
      * @return
      */
     protected synchronized boolean isFull(){
-        return isSlaveBroker()||dispatched.size()-prefetchExtension>=info.getPrefetchSize();
+        return isSlave()||dispatched.size()-prefetchExtension>=info.getPrefetchSize();
     }
 
     /**
@@ -377,7 +377,7 @@
     }
 
     protected synchronized void dispatchMatched() throws IOException{
-        if(!broker.isSlaveBroker()){
+        if(!isSlave()){
             try{
                 int numberToDispatch=countBeforeFull();
                 if(numberToDispatch>0){
@@ -412,7 +412,7 @@
             return false;
         }
         // Make sure we can dispatch a message.
-        if(canDispatch(node)&&!isSlaveBroker()){
+        if(canDispatch(node)&&!isSlave()){
             MessageDispatch md=createMessageDispatch(node,message);
             // NULL messages don't count... they don't get Acked.
             if(node!=QueueMessageReference.NULL_MESSAGE){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Thu Jul 19 12:24:31 2007
@@ -111,7 +111,7 @@
     /**
      * @return true if the broker is currently in slave mode
      */
-    boolean isSlaveBroker();
+    boolean isSlave();
     
     /**
      * @return number of messages pending delivery

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Thu Jul 19 12:24:31 2007
@@ -74,7 +74,7 @@
     public void add(MessageReference node) throws Exception{
         enqueueCounter.incrementAndGet();
         node.incrementReferenceCount();
-        if(!isFull()&&!isSlaveBroker()){
+        if(!isFull()&&!isSlave()){
             optimizePrefetch();
             // if maximumPendingMessages is set we will only discard messages which
             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
Thu Jul 19 12:24:31 2007
@@ -36,7 +36,7 @@
     protected BrokerId[] brokerPath;
     protected boolean brokerMasterConnector;
     protected boolean manageable;
-    protected boolean clientMaster;
+    protected boolean clientMaster=true;
     protected transient Object transportContext; 
     
     public ConnectionInfo() {        



Mime
View raw message