qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1157566 [23/23] - in /qpid: branches/rg-amqp-1-0-sandbox/qpid/java/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/...
Date Sun, 14 Aug 2011 17:15:08 GMT
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
Sun Aug 14 17:14:51 2011
@@ -47,11 +47,10 @@ public class AnonymousSaslServerFactory 
 
     public String[] getMechanismNames(Map props)
     {
-        if (props != null &&
-            (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
-             props.containsKey(Sasl.POLICY_NODICTIONARY) ||
-             props.containsKey(Sasl.POLICY_NOACTIVE) ||
-             props.containsKey(Sasl.POLICY_NOANONYMOUS)))
+        if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+            props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+            props.containsKey(Sasl.POLICY_NOACTIVE) ||
+            props.containsKey(Sasl.POLICY_NOANONYMOUS)))
         {
             // returned array must be non null according to interface documentation
             return new String[0];

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
Sun Aug 14 17:14:51 2011
@@ -45,10 +45,9 @@ public class PlainSaslServerFactory impl
 
     public String[] getMechanismNames(Map props)
     {
-        if (props != null &&
-            (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
-             props.containsKey(Sasl.POLICY_NODICTIONARY) ||
-             props.containsKey(Sasl.POLICY_NOACTIVE)))
+        if (props != null && (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+            props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+            props.containsKey(Sasl.POLICY_NOACTIVE)))
         {
             // returned array must be non null according to interface documentation
             return new String[0];

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
Sun Aug 14 17:14:51 2011
@@ -23,25 +23,9 @@ package org.apache.qpid.server.transport
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
 import static org.apache.qpid.util.Serial.gt;
 
-import java.lang.ref.WeakReference;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.security.auth.Subject;
+import com.sun.security.auth.UserPrincipal;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -54,18 +38,18 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.transport.Binary;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.MessageTransfer;
@@ -74,13 +58,24 @@ import org.apache.qpid.transport.Range;
 import org.apache.qpid.transport.RangeSet;
 import org.apache.qpid.transport.Session;
 import org.apache.qpid.transport.SessionDelegate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class ServerSession extends Session implements AuthorizationHolder, SessionConfig,
AMQSessionModel, LogSubject
+import java.lang.ref.WeakReference;
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel,
LogSubject
 {
-    private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
-    
     private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
 
     private final UUID _id;
@@ -111,12 +106,13 @@ public class ServerSession extends Sessi
             new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
 
     private ServerTransaction _transaction;
-    
+
     private final AtomicLong _txnStarts = new AtomicLong(0);
     private final AtomicLong _txnCommits = new AtomicLong(0);
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
-    private final AtomicLong _txnUpdateTime = new AtomicLong(0);
+
+    private Principal _principal;
 
     private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String,
Subscription_0_10>();
 
@@ -129,27 +125,27 @@ public class ServerSession extends Sessi
         this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
     }
 
-    public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long
expiry, ConnectionConfig connConfig)
-    {
-        super(connection, delegate, name, expiry);
-        _connectionConfig = connConfig;        
-        _transaction = new AutoCommitTransaction(this.getMessageStore());
-
-        _reference = new WeakReference<Session>(this);
-        _id = getConfigStore().createId();
-        getConfigStore().addConfiguredObject(this);
-    }
-
     protected void setState(State state)
     {
         super.setState(state);
 
         if (state == State.OPEN)
         {
-            _actor.message(ChannelMessages.CREATE());
+	        _actor.message(ChannelMessages.CREATE());
         }
     }
 
+    public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long
expiry, ConnectionConfig connConfig)
+    {
+        super(connection, delegate, name, expiry);
+        _connectionConfig = connConfig;
+        _transaction = new AutoCommitTransaction(this.getMessageStore());
+        _principal = new UserPrincipal(connection.getAuthorizationID());
+        _reference = new WeakReference(this);
+        _id = getConfigStore().createId();
+        getConfigStore().addConfiguredObject(this);
+    }
+
     private ConfigStore getConfigStore()
     {
         return getConnectionConfig().getConfigStore();
@@ -164,8 +160,8 @@ public class ServerSession extends Sessi
 
     public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue>
queues)
     {
-        getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
-        _transaction.enqueue(queues,message, new ServerTransaction.Action()
+
+            _transaction.enqueue(queues,message, new ServerTransaction.Action()
             {
 
                 BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
@@ -193,7 +189,6 @@ public class ServerSession extends Sessi
             });
 
             incrementOutstandingTxnsIfNecessary();
-            updateTransactionalActivity();
     }
 
 
@@ -201,7 +196,6 @@ public class ServerSession extends Sessi
                             Runnable postIdSettingAction)
     {
         invoke(xfr, postIdSettingAction);
-        getConnectionModel().registerMessageDelivered(xfr.getBodySize());
     }
 
     public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener
acceptListener)
@@ -337,7 +331,7 @@ public class ServerSession extends Sessi
         }
     }
 
-    public void removeDispositionListener(Method method)                               
+    public void removeDispositionListener(Method method)
     {
         _messageDispositionListenerMap.remove(method.getId());
     }
@@ -357,7 +351,7 @@ public class ServerSession extends Sessi
         {
             task.doTask(this);
         }
-        
+
         CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE());
     }
 
@@ -383,7 +377,6 @@ public class ServerSession extends Sessi
                                      entry.release();
                                  }
                              });
-	    updateTransactionalActivity();
     }
 
     public Collection<Subscription_0_10> getSubscriptions()
@@ -403,7 +396,7 @@ public class ServerSession extends Sessi
 
     public void unregister(Subscription_0_10 sub)
     {
-        _subscriptions.remove(sub.getConsumerTag().toString());
+        _subscriptions.remove(sub.getName());
         try
         {
             sub.getSendLock();
@@ -417,14 +410,14 @@ public class ServerSession extends Sessi
         catch (AMQException e)
         {
             // TODO
-            _logger.error("Failed to unregister subscription", e);
+            e.printStackTrace();  //To change body of catch statement use File | Settings
| File Templates.
         }
         finally
         {
             sub.releaseSendLock();
         }
     }
-    
+
     public boolean isTransactional()
     {
         // this does not look great but there should only be one "non-transactional"
@@ -432,11 +425,6 @@ public class ServerSession extends Sessi
         // theory
         return !(_transaction instanceof AutoCommitTransaction);
     }
-    
-    public boolean inTransaction()
-    {
-        return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime()
> 0;
-    }
 
     public void selectTx()
     {
@@ -447,7 +435,7 @@ public class ServerSession extends Sessi
     public void commit()
     {
         _transaction.commit();
-        
+
         _txnCommits.incrementAndGet();
         _txnStarts.incrementAndGet();
         decrementOutstandingTxnsIfNecessary();
@@ -456,13 +444,13 @@ public class ServerSession extends Sessi
     public void rollback()
     {
         _transaction.rollback();
-        
+
         _txnRejects.incrementAndGet();
         _txnStarts.incrementAndGet();
         decrementOutstandingTxnsIfNecessary();
     }
 
-    
+
     private void incrementOutstandingTxnsIfNecessary()
     {
         if(isTransactional())
@@ -472,7 +460,7 @@ public class ServerSession extends Sessi
             _txnCount.compareAndSet(0,1);
         }
     }
-    
+
     private void decrementOutstandingTxnsIfNecessary()
     {
         if(isTransactional())
@@ -483,17 +471,6 @@ public class ServerSession extends Sessi
         }
     }
 
-    /**
-     * Update last transaction activity timestamp
-     */
-    public void updateTransactionalActivity()
-    {
-        if (isTransactional())
-        {
-            _txnUpdateTime.set(System.currentTimeMillis());
-        }
-    }
-
     public Long getTxnStarts()
     {
         return _txnStarts.get();
@@ -513,15 +490,10 @@ public class ServerSession extends Sessi
     {
         return _txnCount.get();
     }
-    
-    public Principal getAuthorizedPrincipal()
-    {
-        return ((ServerConnection) getConnection()).getAuthorizedPrincipal();
-    }
-    
-    public Subject getAuthorizedSubject()
+
+    public Principal getPrincipal()
     {
-        return ((ServerConnection) getConnection()).getAuthorizedSubject();
+        return _principal;
     }
 
     public void addSessionCloseTask(Task task)
@@ -634,47 +606,17 @@ public class ServerSession extends Sessi
         return (LogSubject) this;
     }
 
-    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long
idleClose) throws AMQException
-    {
-        if (inTransaction())
-        {
-            long currentTime = System.currentTimeMillis();
-            long openTime = currentTime - _transaction.getTransactionStartTime();
-            long idleTime = currentTime - _txnUpdateTime.get();
-
-            // Log a warning on idle or open transactions
-            if (idleWarn > 0L && idleTime > idleWarn)
-            {
-                CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(openTime));
-                _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " "
+ idleTime + " ms");
-            }
-            else if (openWarn > 0L && openTime > openWarn)
-            {
-                CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
-                _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " "
+ openTime + " ms");
-            }
-
-            // Close connection for idle or open transactions that have timed out
-            if (idleClose > 0L && idleTime > idleClose)
-            {
-                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle
transaction timed out");
-            }
-            else if (openClose > 0L && openTime > openClose)
-            {
-                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open
transaction timed out");
-            }
-        }
-    }
-
     public String toLogString()
     {
        return "[" +
                MessageFormat.format(CHANNEL_FORMAT,
-                                   ((ServerConnection) getConnection()).getConnectionId(),
+                                   getConnection().getConnectionId(),
                                    getClientID(),
                                    ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(),
                                    getVirtualHost().getName(),
                                    getChannel())
             + "] ";
+
     }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
Sun Aug 14 17:14:51 2011
@@ -20,16 +20,12 @@
  */
 package org.apache.qpid.server.virtualhost;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
@@ -61,8 +57,7 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -71,7 +66,7 @@ import org.apache.qpid.server.registry.A
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
@@ -99,7 +94,7 @@ public class VirtualHostImpl implements 
 
     private AMQBrokerManagerMBean _brokerMBean;
 
-    private final AuthenticationManager _authenticationManager;
+    private AuthenticationManager _authenticationManager;
 
     private SecurityManager _securityManager;
 
@@ -111,12 +106,11 @@ public class VirtualHostImpl implements 
     private BrokerConfig _broker;
     private UUID _id;
 
-    private boolean _statisticsEnabled = false;
-    private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
     private final long _createTime = System.currentTimeMillis();
     private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink,
BrokerLink>();
     private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
+    private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String,
LinkRegistry>();
 
     public IConnectionRegistry getConnectionRegistry()
     {
@@ -163,12 +157,12 @@ public class VirtualHostImpl implements 
 
         public String getObjectInstanceName()
         {
-            return ObjectName.quote(_name);
+            return _name.toString();
         }
 
         public String getName()
         {
-            return _name;
+            return _name.toString();
         }
 
         public VirtualHostImpl getVirtualHost()
@@ -177,11 +171,22 @@ public class VirtualHostImpl implements 
         }
     }
 
-    public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig,
MessageStore store) throws Exception
+    public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig)
throws Exception
+    {
+        this(appRegistry, hostConfig, null);
+    }
+
+
+    public VirtualHostImpl(VirtualHostConfiguration hostConfig, MessageStore store) throws
Exception
+    {
+        this(ApplicationRegistry.getInstance(),hostConfig,store);
+    }
+
+    private VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig,
MessageStore store) throws Exception
     {
 		if (hostConfig == null)
 		{
-			throw new IllegalArgumentException("HostConfig cannot be null");
+			throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
 		}
 		
         _appRegistry = appRegistry;
@@ -235,13 +240,11 @@ public class VirtualHostImpl implements 
 			initialiseMessageStore(hostConfig);
         }
 		
-        _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager();
+        _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, _configuration);
 
         _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
         _brokerMBean.register();
         initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
-        
-        initialiseStatistics();
     }
 
 	private void initialiseHouseKeeping(long period)
@@ -274,30 +277,19 @@ public class VirtualHostImpl implements 
                             // house keeping task from running.
                         }
                     }
-                    for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
-                    {
-                        _logger.debug("Checking for long running open transactions on connection
" + connection);
-                        for (AMQSessionModel session : connection.getSessionModels())
-                        {
-	                        _logger.debug("Checking for long running open transactions on session
" + session);
-                            try
-                            {
-                                session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
-	                                                           _configuration.getTransactionTimeoutOpenClose(),
-	                                                           _configuration.getTransactionTimeoutIdleWarn(),
-	                                                           _configuration.getTransactionTimeoutIdleClose());
-                            }
-                            catch (Exception e)
-                            {
-                                _logger.error("Exception in housekeeping for connection:
" + connection.toString(), e);
-                            }
-                        }
-                    }
                 }
             }
 
             scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this));
 
+            class ForceChannelClosuresTask extends TimerTask
+            {
+                public void run()
+                {
+                    _connectionRegistry.expireClosedChannels();
+                }
+            }
+
             Map<String, VirtualHostPluginFactory> plugins =
                 ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
 
@@ -450,57 +442,46 @@ public class VirtualHostImpl implements 
     private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException,
ConfigurationException
     {
     	AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
-        String queueName = queue.getName();
 
     	if (queue.isDurable())
     	{
     		getDurableConfigurationStore().createQueue(queue);
     	}
 
-        //get the exchange name (returns default exchange name if none was specified)
     	String exchangeName = queueConfiguration.getExchange();
 
-        Exchange exchange = _exchangeRegistry.getExchange(exchangeName);
+    	Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new
AMQShortString(exchangeName));
+
+        if (exchange == null)
+        {
+            exchange = _exchangeRegistry.getDefaultExchange();
+        }
+
     	if (exchange == null)
     	{
-            throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to
unknown exchange:" + exchangeName);
+    		throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
     	}
 
-        Exchange defaultExchange = _exchangeRegistry.getDefaultExchange();
-
-        //get routing keys in configuration (returns empty list if none are defined)
-        List<?> routingKeys = queueConfiguration.getRoutingKeys();
+        List routingKeys = queueConfiguration.getRoutingKeys();
+        if (routingKeys == null || routingKeys.isEmpty())
+        {
+            routingKeys = Collections.singletonList(queue.getNameShortString());
+        }
 
         for (Object routingKeyNameObj : routingKeys)
         {
-            String routingKey = String.valueOf(routingKeyNameObj);
-
-            if (exchange.equals(defaultExchange) && !queueName.equals(routingKey))
+            AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
+            if (_logger.isInfoEnabled())
             {
-                throw new ConfigurationException("Illegal attempt to bind queue '" + queueName
+
-                        "' to the default exchange with a key other than the queue name:
" + routingKey);
+                _logger.info("Binding queue:" + queue + " with routing key '" + routingKey
+ "' to exchange:" + this);
             }
-
-            configureBinding(queue, exchange, routingKey);
-        }
-
-        if (!exchange.equals(defaultExchange))
-        {
-            //bind the queue to the named exchange using its name
-            configureBinding(queue, exchange, queueName);
+            _bindingFactory.addBinding(routingKey.toString(), queue, exchange, null);
         }
 
-        //ensure the queue is bound to the default exchange using its name
-        configureBinding(queue, defaultExchange, queueName);
-    }
-
-    private void configureBinding(AMQQueue queue, Exchange exchange, String routingKey) throws
AMQException
-    {
-        if (_logger.isInfoEnabled())
+        if (exchange != _exchangeRegistry.getDefaultExchange())
         {
-            _logger.info("Binding queue:" + queue + " with routing key '" + routingKey +
"' to exchange:" + exchange.getName());
+            _bindingFactory.addBinding(queue.getNameShortString().toString(), queue, exchange,
null);
         }
-        _bindingFactory.addBinding(routingKey, queue, exchange, null);
     }
 
     public String getName()
@@ -642,80 +623,6 @@ public class VirtualHostImpl implements 
     {
         return _bindingFactory;
     }
-    
-    public void registerMessageDelivered(long messageSize)
-    {
-        if (isStatisticsEnabled())
-        {
-            _messagesDelivered.registerEvent(1L);
-            _dataDelivered.registerEvent(messageSize);
-        }
-        _appRegistry.registerMessageDelivered(messageSize);
-    }
-    
-    public void registerMessageReceived(long messageSize, long timestamp)
-    {
-        if (isStatisticsEnabled())
-        {
-            _messagesReceived.registerEvent(1L, timestamp);
-            _dataReceived.registerEvent(messageSize, timestamp);
-        }
-        _appRegistry.registerMessageReceived(messageSize, timestamp);
-    }
-    
-    public StatisticsCounter getMessageReceiptStatistics()
-    {
-        return _messagesReceived;
-    }
-    
-    public StatisticsCounter getDataReceiptStatistics()
-    {
-        return _dataReceived;
-    }
-    
-    public StatisticsCounter getMessageDeliveryStatistics()
-    {
-        return _messagesDelivered;
-    }
-    
-    public StatisticsCounter getDataDeliveryStatistics()
-    {
-        return _dataDelivered;
-    }
-    
-    public void resetStatistics()
-    {
-        _messagesDelivered.reset();
-        _dataDelivered.reset();
-        _messagesReceived.reset();
-        _dataReceived.reset();
-        
-        for (AMQConnectionModel connection : _connectionRegistry.getConnections())
-        {
-            connection.resetStatistics();
-        }
-    }
-
-    public void initialiseStatistics()
-    {
-        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
-                _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
-        
-        _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
-        _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
-        _messagesReceived = new StatisticsCounter("messages-received-" + getName());
-        _dataReceived = new StatisticsCounter("bytes-received-" + getName());
-    }
-
-    public boolean isStatisticsEnabled()
-    {
-        return _statisticsEnabled;
-    }
-
-    public void setStatisticsEnabled(boolean enabled)
-    {
-        _statisticsEnabled = enabled;
-    }
 
     public void createBrokerConnection(final String transport,
                                        final String host,
@@ -752,6 +659,17 @@ public class VirtualHostImpl implements 
         }
     }
 
+    public synchronized LinkRegistry getLinkRegistry(String remoteContainerId)
+    {
+        LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId);
+        if(linkRegistry == null)
+        {
+            linkRegistry = new LinkRegistry();
+            _linkRegistry.put(remoteContainerId, linkRegistry);
+        }
+        return linkRegistry;
+    }
+
     public ConfigStore getConfigStore()
     {
         return getApplicationRegistry().getConfigStore();

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
(original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
Sun Aug 14 17:14:51 2011
@@ -100,14 +100,14 @@ public class AbstractHeadersExchangeTest
 
         return bind(queueName, queueName, getHeadersMap(bindings));
     }
-    
+
     protected void unbind(TestQueue queue, String... bindings) throws AMQException
     {
         String queueName = queue.getName();
         //TODO - check this
         exchange.onUnbind(new Binding(null,queueName, queue, exchange, getHeadersMap(bindings)));
     }
-    
+
     protected int getCount()
     {
         return count;
@@ -120,7 +120,7 @@ public class AbstractHeadersExchangeTest
         exchange.onBind(new Binding(null,key, queue, exchange, args));
         return queue;
     }
-    
+
 
     protected int route(Message m) throws AMQException
     {
@@ -175,14 +175,14 @@ public class AbstractHeadersExchangeTest
             }
 
     }
-    
+
     static Map<String,Object> getHeadersMap(String... entries)
     {
         if(entries == null)
         {
             return null;
         }
-        
+
         Map<String,Object> headers = new HashMap<String,Object>();
 
         for (String s : entries)
@@ -276,7 +276,7 @@ public class AbstractHeadersExchangeTest
     static ContentHeaderBody getContentHeader(FieldTable headers)
     {
         ContentHeaderBody header = new ContentHeaderBody();
-        header.setProperties(getProperties(headers));
+        header.properties = getProperties(headers);
         return header;
     }
 
@@ -428,11 +428,21 @@ public class AbstractHeadersExchangeTest
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public boolean isRejectedBy(long subscriptionId)
+                public void reject(Subscription subscription)
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean isRejectedBy(Subscription subscription)
                 {
                     return false;  //To change body of implemented methods use File | Settings
| File Templates.
                 }
 
+                public void requeue(Subscription subscription)
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
                 public void dequeue()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
@@ -472,16 +482,6 @@ public class AbstractHeadersExchangeTest
                 {
                     return 0;  //To change body of implemented methods use File | Settings
| File Templates.
                 }
-
-                public boolean isDequeued()
-                {
-                    return false;
-                }
-
-                public boolean isDispensed()
-                {
-                    return false;
-                }
             };
 
             if(action != null)

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
(original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
Sun Aug 14 17:14:51 2011
@@ -31,10 +31,10 @@ import org.apache.qpid.server.message.Se
 /**
  * Mock Server Message allowing its persistent flag to be controlled from test.
  */
-class MockServerMessage implements ServerMessage
+class MockServerMessage implements ServerMessage<MockServerMessage>
 {
     /**
-     * 
+     *
      */
     private final boolean persistent;
 
@@ -46,56 +46,67 @@ class MockServerMessage implements Serve
         this.persistent = persistent;
     }
 
+
     public boolean isPersistent()
     {
         return persistent;
     }
 
-    public MessageReference newReference()
+
+    public MessageReference<MockServerMessage> newReference()
     {
         throw new NotImplementedException();
     }
 
+
     public boolean isImmediate()
     {
         throw new NotImplementedException();
     }
 
+
     public long getSize()
     {
         throw new NotImplementedException();
     }
 
+
     public SessionConfig getSessionConfig()
     {
         throw new NotImplementedException();
     }
 
+
     public String getRoutingKey()
     {
         throw new NotImplementedException();
     }
 
+
     public AMQMessageHeader getMessageHeader()
     {
         throw new NotImplementedException();
     }
 
+
     public long getExpiration()
     {
         throw new NotImplementedException();
     }
 
+
     public int getContent(ByteBuffer buf, int offset)
     {
         throw new NotImplementedException();
     }
 
+
     public long getArrivalTime()
     {
         throw new NotImplementedException();
     }
 
+
     public Long getMessageNumber()
     {
         return 0L;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message