qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1663717 [2/6] - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java: ./ amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ amqp-1-0-common/src/main/java/org/a...
Date Tue, 03 Mar 2015 14:56:42 GMT
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java Tue Mar  3 14:56:40 2015
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.binding;
 
-import java.security.AccessControlException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -45,10 +44,8 @@ import org.apache.qpid.server.model.Mana
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.StateTransition;
-import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.util.StateChangeListener;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class BindingImpl
         extends AbstractConfiguredObject<BindingImpl>
@@ -108,26 +105,6 @@ public class BindingImpl
         }
     }
 
-    @Override
-    protected void onCreate()
-    {
-        super.onCreate();
-        try
-        {
-            _queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this);
-        }
-        catch(AccessControlException e)
-        {
-            deleted();
-            throw e;
-        }
-        if (isDurable())
-        {
-            _queue.getVirtualHost().getDurableConfigurationStore().create(asObjectRecord());
-        }
-
-    }
-
     private static Map<String, Object> enhanceWithDurable(Map<String, Object> attributes,
                                                           final AMQQueue queue,
                                                           final ExchangeImpl exchange)
@@ -263,12 +240,6 @@ public class BindingImpl
                     {
                         _arguments = arguments;
                         BindingImpl.super.setAttribute(ARGUMENTS, getActualAttributes().get(ARGUMENTS), arguments);
-                        if (isDurable())
-                        {
-                            VirtualHostImpl<?, ?, ?> vhost =
-                                    (VirtualHostImpl<?, ?, ?>) _exchange.getParent(VirtualHost.class);
-                            vhost.getDurableConfigurationStore().update(true, asObjectRecord());
-                        }
                     }
                 }
                );
@@ -278,6 +249,8 @@ public class BindingImpl
     @Override
     public void validateOnCreate()
     {
+        _queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this);
+
         AMQQueue queue = getAMQQueue();
         Map<String, Object> arguments = getArguments();
         if (arguments!=null && !arguments.isEmpty() && FilterSupport.argumentsContainFilter(arguments))

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java Tue Mar  3 14:56:40 2015
@@ -48,6 +48,7 @@ public class BrokerProperties
     public static final String PROPERTY_QPID_HOME = "QPID_HOME";
     public static final String PROPERTY_QPID_WORK = "QPID_WORK";
     public static final String PROPERTY_LOG_RECORDS_BUFFER_SIZE = "qpid.broker_log_records_buffer_size";
+    public static final String POSIX_FILE_PERMISSIONS = "qpid.default_posix_file_permissions";
 
     private BrokerProperties()
     {

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java Tue Mar  3 14:56:40 2015
@@ -25,7 +25,6 @@ import java.util.Collection;
 import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 
 public class StoreConfigurationChangeListener implements ConfigurationChangeListener
@@ -43,7 +42,10 @@ public class StoreConfigurationChangeLis
     {
         if (newState == State.DELETED)
         {
-            _store.remove(object.asObjectRecord());
+            if(object.isDurable())
+            {
+                _store.remove(object.asObjectRecord());
+            }
             object.removeChangeListener(this);
         }
     }
@@ -51,20 +53,23 @@ public class StoreConfigurationChangeLis
     @Override
     public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child)
     {
-        // exclude VirtualHostNode children from storing in broker store
-        if (!(object instanceof VirtualHostNode))
+        if (!object.managesChildStorage())
         {
-            child.addChangeListener(this);
-            _store.update(true,child.asObjectRecord());
+            if(object.isDurable() && child.isDurable())
+            {
+                child.addChangeListener(this);
+                _store.update(true, child.asObjectRecord());
 
-            Class<? extends ConfiguredObject> categoryClass = child.getCategoryClass();
-            Collection<Class<? extends ConfiguredObject>> childTypes = child.getModel().getChildTypes(categoryClass);
+                Class<? extends ConfiguredObject> categoryClass = child.getCategoryClass();
+                Collection<Class<? extends ConfiguredObject>> childTypes =
+                        child.getModel().getChildTypes(categoryClass);
 
-            for(Class<? extends ConfiguredObject> childClass : childTypes)
-            {
-                for (ConfiguredObject<?> grandchild : child.getChildren(childClass))
+                for (Class<? extends ConfiguredObject> childClass : childTypes)
                 {
-                    childAdded(child, grandchild);
+                    for (ConfiguredObject<?> grandchild : child.getChildren(childClass))
+                    {
+                        childAdded(child, grandchild);
+                    }
                 }
             }
         }
@@ -74,14 +79,20 @@ public class StoreConfigurationChangeLis
     @Override
     public void childRemoved(ConfiguredObject object, ConfiguredObject child)
     {
-        _store.remove(child.asObjectRecord());
+        if(child.isDurable())
+        {
+            _store.remove(child.asObjectRecord());
+        }
         child.removeChangeListener(this);
     }
 
     @Override
     public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
     {
-        _store.update(false, object.asObjectRecord());
+        if(object.isDurable())
+        {
+            _store.update(false, object.asObjectRecord());
+        }
     }
 
     @Override

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Tue Mar  3 14:56:40 2015
@@ -177,17 +177,6 @@ public abstract class AbstractExchange<T
     }
 
     @Override
-    protected void onCreate()
-    {
-        super.onCreate();
-        if(isDurable())
-        {
-            getVirtualHost().getDurableConfigurationStore().create(asObjectRecord());
-        }
-
-    }
-
-    @Override
     public EventLogger getEventLogger()
     {
         return _virtualHost.getEventLogger();
@@ -213,12 +202,6 @@ public abstract class AbstractExchange<T
             throw new RequiredExchangeException(getName());
         }
 
-        if (isDurable() && !isAutoDelete())
-        {
-            getVirtualHost().getDurableConfigurationStore().remove(asObjectRecord());
-
-        }
-
         if(_closed.compareAndSet(false,true))
         {
             List<BindingImpl> bindings = new ArrayList<BindingImpl>(_bindings);
@@ -241,11 +224,6 @@ public abstract class AbstractExchange<T
             }
             _closeTaskList.clear();
 
-            if (isDurable() && !isAutoDelete())
-            {
-                getVirtualHost().getDurableConfigurationStore().remove(asObjectRecord());
-
-            }
         }
         deleted();
     }
@@ -665,10 +643,6 @@ public abstract class AbstractExchange<T
             doRemoveBinding(b);
             queue.removeBinding(b);
 
-            if (b.isDurable())
-            {
-                _virtualHost.getDurableConfigurationStore().remove(b.asObjectRecord());
-            }
             b.delete();
         }
 
@@ -905,10 +879,6 @@ public abstract class AbstractExchange<T
     protected void changeAttributes(final Map<String, Object> attributes)
     {
         super.changeAttributes(attributes);
-        if (isDurable() && getState() != State.DELETED)
-        {
-            this.getVirtualHost().getDurableConfigurationStore().update(false, asObjectRecord());
-        }
     }
 
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java Tue Mar  3 14:56:40 2015
@@ -62,7 +62,8 @@ public class DefaultDestination implemen
         final AMQQueue q = _virtualHost.getQueue(routingAddress);
         if(q == null)
         {
-            if(routingAddress != null && routingAddress.contains("/") && !routingAddress.startsWith("/"))
+            routingAddress = _virtualHost.getLocalAddress(routingAddress);
+            if(routingAddress.contains("/") && !routingAddress.startsWith("/"))
             {
                 String[] parts = routingAddress.split("/",2);
                 ExchangeImpl exchange = _virtualHost.getExchange(parts[0]);
@@ -71,7 +72,7 @@ public class DefaultDestination implemen
                     return exchange.send(message, parts[1], instanceProperties, txn, postEnqueueAction);
                 }
             }
-            else if(routingAddress == null || !routingAddress.contains("/"))
+            else if(!routingAddress.contains("/"))
             {
                 ExchangeImpl exchange = _virtualHost.getExchange(routingAddress);
                 if(exchange != null)

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Tue Mar  3 14:56:40 2015
@@ -87,6 +87,12 @@ class HeadersBinding
                              +"' with arguments: " + _binding.getArguments());
                 _filter = new MessageFilter()
                     {
+                    @Override
+                        public String getName()
+                        {
+                            return "";
+                        }
+
                         @Override
                         public boolean matches(Filterable message)
                         {

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java Tue Mar  3 14:56:40 2015
@@ -14,26 +14,62 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.
- *
+ *  under the License.    
  *
+ * 
  */
 package org.apache.qpid.server.filter;
-//
-// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
-//
 
 import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
-public interface FilterManager
+public class FilterManager
 {
-    void add(MessageFilter filter);
 
-    void remove(MessageFilter filter);
+    private final Map<String, MessageFilter> _filters = new ConcurrentHashMap<>();
 
-    boolean allAllow(Filterable  msg);
+    public FilterManager()
+    {
+    }
+
+    public void add(String name, MessageFilter filter)
+    {
+        _filters.put(name, filter);
+    }
+
+    public boolean allAllow(Filterable msg)
+    {
+        for (MessageFilter filter : _filters.values())
+        {
+            if (!filter.matches(msg))
+            {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public Iterator<MessageFilter> filters()
+    {
+        return _filters.values().iterator();
+    }
+
+    public boolean hasFilters()
+    {
+        return !_filters.isEmpty();
+    }
+
+    public boolean hasFilter(final String name)
+    {
+        return _filters.containsKey(name);
+    }
+
+    @Override
+    public String toString()
+    {
+        return _filters.toString();
+    }
 
-    Iterator<MessageFilter> filters();
 
-    boolean hasFilters();
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java Tue Mar  3 14:56:40 2015
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.filter;
 
+import java.util.Map;
+
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.common.AMQPFilterTypes;
@@ -27,8 +29,6 @@ import org.apache.qpid.filter.SelectorPa
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.TokenMgrError;
 
-import java.util.Map;
-
 
 public class FilterManagerFactory
 {
@@ -54,20 +54,13 @@ public class FilterManagerFactory
 
                 if (selector instanceof String && !selector.equals(""))
                 {
-                    manager = new SimpleFilterManager();
+                    manager = new FilterManager();
                     try
                     {
-                        manager.add(new JMSSelectorFilter((String)selector));
-                    }
-                    catch (ParseException e)
-                    {
-                        throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e);
-                    }
-                    catch (SelectorParsingException e)
-                    {
-                        throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e);
+                        MessageFilter filter = new JMSSelectorFilter((String)selector);
+                        manager.add(filter.getName(), filter);
                     }
-                    catch (TokenMgrError e)
+                    catch (ParseException | SelectorParsingException | TokenMgrError e)
                     {
                         throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selector + "\"", e);
                     }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java Tue Mar  3 14:56:40 2015
@@ -26,12 +26,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.WeakHashMap;
+
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.filter.SelectorParsingException;
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.TokenMgrError;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.queue.AMQQueue;
 
 public class FilterSupport
@@ -57,15 +59,7 @@ public class FilterSupport
             {
                 selector = new JMSSelectorFilter(selectorString);
             }
-            catch (ParseException e)
-            {
-                throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
-            }
-            catch (SelectorParsingException e)
-            {
-                throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
-            }
-            catch (TokenMgrError e)
+            catch (ParseException | SelectorParsingException | TokenMgrError e)
             {
                 throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
             }
@@ -119,6 +113,7 @@ public class FilterSupport
         }
     }
 
+    @PluggableService
     public static final class NoLocalFilter implements MessageFilter
     {
         private final MessageSource _queue;
@@ -128,6 +123,12 @@ public class FilterSupport
             _queue = queue;
         }
 
+        @Override
+        public String getName()
+        {
+            return AMQPFilterTypes.NO_LOCAL.toString();
+        }
+
         public boolean matches(Filterable message)
         {
 
@@ -165,6 +166,8 @@ public class FilterSupport
         {
             return _queue != null ? _queue.hashCode() : 0;
         }
+
+
     }
 
     static final class CompoundFilter implements MessageFilter
@@ -178,6 +181,12 @@ public class FilterSupport
             _jmsSelectorFilter = jmsSelectorFilter;
         }
 
+        @Override
+        public String getName()
+        {
+            return "";
+        }
+
         public boolean matches(Filterable message)
         {
             return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
@@ -216,5 +225,7 @@ public class FilterSupport
             result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
             return result;
         }
+
+
     }
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java Tue Mar  3 14:56:40 2015
@@ -34,6 +34,10 @@ public interface Filterable
 
     Object getConnectionReference();
 
+    long getMessageNumber();
+
+    long getArrivalTime();
+
     public class Factory
     {
 
@@ -41,6 +45,7 @@ public interface Filterable
         {
             return new Filterable()
             {
+
                 @Override
                 public AMQMessageHeader getMessageHeader()
                 {
@@ -64,6 +69,18 @@ public interface Filterable
                 {
                     return message.getConnectionReference();
                 }
+
+                @Override
+                public long getMessageNumber()
+                {
+                    return message.getMessageNumber();
+                }
+
+                @Override
+                public long getArrivalTime()
+                {
+                    return message.getArrivalTime();
+                }
             };
         }
     }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java Tue Mar  3 14:56:40 2015
@@ -25,14 +25,18 @@ import org.apache.commons.lang.builder.H
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 import org.apache.log4j.Logger;
+
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.filter.BooleanExpression;
 import org.apache.qpid.filter.FilterableMessage;
 import org.apache.qpid.filter.SelectorParsingException;
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.SelectorParser;
 import org.apache.qpid.filter.selector.TokenMgrError;
+import org.apache.qpid.server.plugin.PluggableService;
 
 
+@PluggableService
 public class JMSSelectorFilter implements MessageFilter
 {
     private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
@@ -46,6 +50,12 @@ public class JMSSelectorFilter implement
         _matcher = new SelectorParser().parse(selector);
     }
 
+    @Override
+    public String getName()
+    {
+        return AMQPFilterTypes.JMS_SELECTOR.toString();
+    }
+
     public boolean matches(Filterable message)
     {
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java Tue Mar  3 14:56:40 2015
@@ -22,5 +22,6 @@ package org.apache.qpid.server.filter;
 
 public interface MessageFilter
 {
+    String getName();
     boolean matches(Filterable message);
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Tue Mar  3 14:56:40 2015
@@ -122,8 +122,11 @@ public abstract class AbstractConfigured
     private final TaskExecutor _taskExecutor;
 
     private final Class<? extends ConfiguredObject> _category;
+    private final Class<? extends ConfiguredObject> _typeClass;
     private final Class<? extends ConfiguredObject> _bestFitInterface;
     private final Model _model;
+    private final boolean _managesChildStorage;
+
 
     @ManagedAttributeField
     private long _createdTime;
@@ -206,6 +209,8 @@ public abstract class AbstractConfigured
         _model = model;
 
         _category = ConfiguredObjectTypeRegistry.getCategory(getClass());
+        Class<? extends ConfiguredObject> typeClass = model.getTypeRegistry().getTypeClass(getClass());
+        _typeClass = typeClass == null ? _category : typeClass;
 
         _attributeTypes = model.getTypeRegistry().getAttributeTypes(getClass());
         _automatedFields = model.getTypeRegistry().getAutomatedFields(getClass());
@@ -242,6 +247,7 @@ public abstract class AbstractConfigured
         }
 
         _type = ConfiguredObjectTypeRegistry.getType(getClass());
+        _managesChildStorage = managesChildren(_category) || managesChildren(_typeClass);
         _bestFitInterface = calculateBestFitInterface();
 
         if(attributes.get(TYPE) != null && !_type.equals(attributes.get(TYPE)))
@@ -315,6 +321,11 @@ public abstract class AbstractConfigured
         }
     }
 
+    private boolean managesChildren(final Class<? extends ConfiguredObject> clazz)
+    {
+        return clazz.getAnnotation(ManagedObject.class).managesChildren();
+    }
+
     private Class<? extends ConfiguredObject> calculateBestFitInterface()
     {
         Set<Class<? extends ConfiguredObject>> candidates = new HashSet<Class<? extends ConfiguredObject>>();
@@ -1056,11 +1067,24 @@ public abstract class AbstractConfigured
         return _model;
     }
 
+    @Override
     public Class<? extends ConfiguredObject> getCategoryClass()
     {
         return _category;
     }
 
+    @Override
+    public Class<? extends ConfiguredObject> getTypeClass()
+    {
+        return _typeClass;
+    }
+
+    @Override
+    public boolean managesChildStorage()
+    {
+        return _managesChildStorage;
+    }
+
     public Map<String,String> getContext()
     {
         return _context == null ? Collections.<String,String>emptyMap() : Collections.unmodifiableMap(_context);
@@ -1219,8 +1243,7 @@ public abstract class AbstractConfigured
         if(attr != null && (attr.isAutomated() || attr.isDerived()))
         {
             Object value = attr.getValue((X)this);
-            if(value != null && attr.isSecure() &&
-               !SecurityManager.isSystemProcess())
+            if(value != null && !SecurityManager.isSystemProcess() && attr.isSecureValue(value))
             {
                 return SECURE_VALUES.get(value.getClass());
             }
@@ -1620,8 +1643,9 @@ public abstract class AbstractConfigured
             {
                 Object desired = attributes.get(name);
                 Object expected = getAttribute(name);
-                if(((_attributes.get(name) != null && !_attributes.get(name).equals(attributes.get(name)))
-                     || attributes.get(name) != null)
+                Object currentValue = _attributes.get(name);
+                if(((currentValue != null && !currentValue.equals(desired))
+                     || (currentValue == null && desired != null))
                     && changeAttribute(name, expected, desired))
                 {
                     attributeSet(name, expected, desired);

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java Tue Mar  3 14:56:40 2015
@@ -50,6 +50,25 @@ abstract class AttributeValueConverter<T
         }
     };
 
+    static final AttributeValueConverter<Object> OBJECT_CONVERTER = new AttributeValueConverter<Object>()
+    {
+        @Override
+        public Object convert(final Object value, final ConfiguredObject object)
+        {
+            if(value instanceof String)
+            {
+                return AbstractConfiguredObject.interpolate(object, (String) value);
+            }
+            else if(value == null)
+            {
+                return null;
+            }
+            else
+            {
+                return value;
+            }
+        }
+    };
     static final AttributeValueConverter<UUID> UUID_CONVERTER = new AttributeValueConverter<UUID>()
     {
         @Override
@@ -398,7 +417,17 @@ abstract class AttributeValueConverter<T
         }
         else if(Map.class.isAssignableFrom(type))
         {
-            return (AttributeValueConverter<X>) MAP_CONVERTER;
+            if(returnType instanceof ParameterizedType)
+            {
+                Type keyType = ((ParameterizedType) returnType).getActualTypeArguments()[0];
+                Type valueType = ((ParameterizedType) returnType).getActualTypeArguments()[1];
+
+                return (AttributeValueConverter<X>) new GenericMapConverter(keyType,valueType);
+            }
+            else
+            {
+                return (AttributeValueConverter<X>) MAP_CONVERTER;
+            }
         }
         else if(Collection.class.isAssignableFrom(type))
         {
@@ -416,6 +445,10 @@ abstract class AttributeValueConverter<T
         {
             return (AttributeValueConverter<X>) new ConfiguredObjectConverter(type);
         }
+        else if(Object.class == type)
+        {
+            return (AttributeValueConverter<X>) OBJECT_CONVERTER;
+        }
         throw new IllegalArgumentException("Cannot create attribute converter of type " + type.getName());
     }
 
@@ -575,6 +608,62 @@ abstract class AttributeValueConverter<T
         }
     }
 
+    public static class GenericMapConverter extends AttributeValueConverter<Map>
+    {
+
+        private final AttributeValueConverter<?> _keyConverter;
+        private final AttributeValueConverter<?> _valueConverter;
+
+
+        public GenericMapConverter(final Type keyType, final Type valueType)
+        {
+            _keyConverter = getConverter(getRawType(keyType), keyType);
+
+            _valueConverter = getConverter(getRawType(valueType), valueType);
+        }
+
+
+        @Override
+        public Map convert(final Object value, final ConfiguredObject object)
+        {
+            if(value instanceof Map)
+            {
+                Map<?,?> original = (Map<?,?>)value;
+                Map converted = new LinkedHashMap(original.size());
+                for(Map.Entry<?,?> entry : original.entrySet())
+                {
+                    converted.put(_keyConverter.convert(entry.getKey(),object),
+                                  _valueConverter.convert(entry.getValue(), object));
+                }
+                return Collections.unmodifiableMap(converted);
+            }
+            else if(value == null)
+            {
+                return null;
+            }
+            else
+            {
+                if(value instanceof String)
+                {
+                    String interpolated = AbstractConfiguredObject.interpolate(object, (String) value);
+                    ObjectMapper objectMapper = new ObjectMapper();
+                    try
+                    {
+                        return convert(objectMapper.readValue(interpolated, Map.class), object);
+                    }
+                    catch (IOException e)
+                    {
+                        // fall through to the non-JSON single object case
+                    }
+                }
+
+                throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Map");
+            }
+
+        }
+    }
+
+
     static final class EnumConverter<X extends Enum<X>> extends AttributeValueConverter<X>
     {
         private final Class<X> _klazz;

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredAutomatedAttribute.java Tue Mar  3 14:56:40 2015
@@ -28,6 +28,7 @@ import java.lang.reflect.Type;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.regex.Pattern;
 
 import org.apache.log4j.Logger;
 
@@ -37,6 +38,7 @@ public class ConfiguredAutomatedAttribut
 
     private final ManagedAttribute _annotation;
     private final Method _validValuesMethod;
+    private final Pattern _secureValuePattern;
 
     ConfiguredAutomatedAttribute(final Class<C> clazz,
                                  final Method getter,
@@ -53,6 +55,16 @@ public class ConfiguredAutomatedAttribut
             validValuesMethod = getValidValuesMethod(validValue, clazz);
         }
         _validValuesMethod = validValuesMethod;
+
+        String secureValueFilter = _annotation.secureValueFilter();
+        if (secureValueFilter == null || "".equals(secureValueFilter))
+        {
+            _secureValuePattern = null;
+        }
+        else
+        {
+            _secureValuePattern = Pattern.compile(secureValueFilter);
+        }
     }
 
     private Method getValidValuesMethod(final String validValue, final Class<C> clazz)
@@ -140,6 +152,11 @@ public class ConfiguredAutomatedAttribut
         return _annotation.description();
     }
 
+    public Pattern getSecureValueFilter()
+    {
+        return _secureValuePattern;
+    }
+
     public Collection<String> validValues()
     {
         if(_validValuesMethod != null)

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredDerivedAttribute.java Tue Mar  3 14:56:40 2015
@@ -21,10 +21,12 @@
 package org.apache.qpid.server.model;
 
 import java.lang.reflect.Method;
+import java.util.regex.Pattern;
 
 public class ConfiguredDerivedAttribute<C extends ConfiguredObject, T>  extends ConfiguredObjectAttribute<C,T>
 {
     private final DerivedAttribute _annotation;
+    private final Pattern _secureValuePattern;
 
     ConfiguredDerivedAttribute(final Class<C> clazz,
                                final Method getter,
@@ -32,6 +34,16 @@ public class ConfiguredDerivedAttribute<
     {
         super(clazz, getter);
         _annotation = annotation;
+
+        String secureValueFilter = _annotation.secureValueFilter();
+        if (secureValueFilter == null || "".equals(secureValueFilter))
+        {
+            _secureValuePattern = null;
+        }
+        else
+        {
+            _secureValuePattern = Pattern.compile(secureValueFilter);
+        }
     }
 
     public boolean isAutomated()
@@ -72,4 +84,10 @@ public class ConfiguredDerivedAttribute<
         return _annotation.description();
     }
 
+    @Override
+    public Pattern getSecureValueFilter()
+    {
+        return _secureValuePattern;
+    }
+
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java Tue Mar  3 14:56:40 2015
@@ -239,6 +239,9 @@ public interface ConfiguredObject<X exte
     void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException;
 
     Class<? extends ConfiguredObject> getCategoryClass();
+    Class<? extends ConfiguredObject> getTypeClass();
+
+    boolean managesChildStorage();
 
     <C extends ConfiguredObject<C>> C findConfiguredObject(Class<C> clazz, String name);
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java Tue Mar  3 14:56:40 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.server.model;
 
 import java.lang.reflect.Method;
 import java.lang.reflect.Type;
+import java.util.regex.Pattern;
 
 public abstract class ConfiguredObjectAttribute<C extends ConfiguredObject, T> extends ConfiguredObjectAttributeOrStatistic<C,T>
 {
@@ -49,6 +50,25 @@ public abstract class ConfiguredObjectAt
 
     public abstract String getDescription();
 
+    public abstract Pattern getSecureValueFilter();
+
+    public boolean isSecureValue(Object value)
+    {
+        if (isSecure())
+        {
+            Pattern filter = getSecureValueFilter();
+            if (filter == null)
+            {
+                return  true;
+            }
+            else
+            {
+                return filter.matcher(String.valueOf(value)).matches();
+            }
+        }
+        return false;
+    }
+
     public T convert(final Object value, C object)
     {
         final AttributeValueConverter<T> converter = getConverter();

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java Tue Mar  3 14:56:40 2015
@@ -156,7 +156,7 @@ public class ConfiguredObjectFactoryImpl
             factory = categoryFactories.get(_defaultTypes.get(category));
             if(factory == null)
             {
-                throw new NoFactoryForTypeException(category, _defaultTypes.get(category));
+                throw new NoFactoryForTypeException(category, type);
             }
         }
         return factory;

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java Tue Mar  3 14:56:40 2015
@@ -385,7 +385,7 @@ public class ConfiguredObjectTypeRegistr
         return null;
     }
 
-    private Class<? extends ConfiguredObject> getTypeClass(final Class<? extends ConfiguredObject> clazz)
+    public Class<? extends ConfiguredObject> getTypeClass(final Class<? extends ConfiguredObject> clazz)
     {
         String typeName = getType(clazz);
         Class<? extends ConfiguredObject> typeClass = null;

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/DerivedAttribute.java Tue Mar  3 14:56:40 2015
@@ -32,4 +32,5 @@ public @interface DerivedAttribute
     boolean persist() default false;
     String description() default "";
     boolean oversize() default false;
+    String secureValueFilter() default "";
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedAttribute.java Tue Mar  3 14:56:40 2015
@@ -37,4 +37,5 @@ public @interface ManagedAttribute
     String[] validValues() default {};
     boolean oversize() default false;
     String oversizedAltText() default "";
+    String secureValueFilter() default "";
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Tue Mar  3 14:56:40 2015
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.model;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.qpid.server.queue.QueueEntryVisitor;
 import org.apache.qpid.server.store.MessageDurability;
@@ -48,6 +50,8 @@ public interface Queue<X extends Queue<X
     String QUEUE_FLOW_STOPPED = "queueFlowStopped";
     String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
     String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
+    String DEFAULT_FILTERS = "defaultFilters";
+    String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers";
 
     String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
     @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT)
@@ -67,6 +71,9 @@ public interface Queue<X extends Queue<X
     @ManagedAttribute( defaultValue = "NONE" )
     ExclusivityPolicy getExclusive();
 
+    @ManagedAttribute( defaultValue = "false" )
+    boolean isEnsureNondestructiveConsumers();
+
     @DerivedAttribute( persist = true )
     String getOwner();
 
@@ -155,6 +162,9 @@ public interface Queue<X extends Queue<X
     @ManagedAttribute
     long getMaximumMessageTtl();
 
+    @ManagedAttribute
+    Map<String, Map<String,List<String>>> getDefaultFilters();
+
     //children
     Collection<? extends Binding> getBindings();
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/RemoteReplicationNode.java Tue Mar  3 14:56:40 2015
@@ -20,9 +20,6 @@
  */
 package org.apache.qpid.server.model;
 
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.ManagedObject;
-
 @ManagedObject(category=true, managesChildren=false, creatable=false)
 public interface RemoteReplicationNode<X extends RemoteReplicationNode<X>> extends ConfiguredObject<X>
 {

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemConfig.java Tue Mar  3 14:56:40 2015
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.model;
 
+import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogRecorder;
 import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -37,6 +38,9 @@ public interface SystemConfig<X extends
     String INITIAL_CONFIGURATION_LOCATION = "initialConfigurationLocation";
     String STARTUP_LOGGED_TO_SYSTEM_OUT = "startupLoggedToSystemOut";
 
+    @ManagedContextDefault(name = BrokerProperties.POSIX_FILE_PERMISSIONS)
+    String DEFAULT_POSIX_FILE_PERMISSIONS = "rw-r-----";
+
     @ManagedAttribute(defaultValue = "false")
     boolean isManagementMode();
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Tue Mar  3 14:56:40 2015
@@ -22,14 +22,16 @@ package org.apache.qpid.server.model;
 
 import java.security.AccessControlException;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.store.MessageStore;
 
-@ManagedObject( managesChildren = true, defaultType = "ProvidedStore")
+@ManagedObject( defaultType = "ProvidedStore")
 public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, E extends Exchange<?> > extends ConfiguredObject<X>
 {
 
@@ -42,6 +44,9 @@ public interface VirtualHost<X extends V
     String STORE_TRANSACTION_OPEN_TIMEOUT_WARN  = "storeTransactionOpenTimeoutWarn";
     String HOUSE_KEEPING_THREAD_COUNT           = "houseKeepingThreadCount";
     String MODEL_VERSION                        = "modelVersion";
+    String ENABLED_CONNECTION_VALIDATORS        = "enabledConnectionValidators";
+    String DISABLED_CONNECTION_VALIDATORS       = "disabledConnectionValidators";
+    String GLOBAL_ADDRESS_DOMAINS               = "globalAddressDomains";
 
     @ManagedContextDefault( name = "queue.deadLetterQueueEnabled")
     public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false;
@@ -88,6 +93,21 @@ public interface VirtualHost<X extends V
     @DerivedAttribute( persist = true )
     String getModelVersion();
 
+    @ManagedContextDefault( name = "virtualhost.enabledConnectionValidators")
+    String DEFAULT_ENABLED_VALIDATORS = "[]";
+
+    @ManagedAttribute( defaultValue = "${virtualhost.enabledConnectionValidators}")
+    List<String> getEnabledConnectionValidators();
+
+    @ManagedContextDefault( name = "virtualhost.disabledConnectionValidators")
+    String DEFAULT_DISABLED_VALIDATORS = "[]";
+
+    @ManagedAttribute( defaultValue = "${virtualhost.disabledConnectionValidators}")
+    List<String> getDisabledConnectionValidators();
+
+    @ManagedAttribute( defaultValue = "[]")
+    List<String> getGlobalAddressDomains();
+
     @ManagedStatistic
     long getQueueCount();
 
@@ -129,6 +149,8 @@ public interface VirtualHost<X extends V
 
     void delete();
 
+    String getRedirectHost(AmqpPort<?> port);
+
     public static interface Transaction
     {
         void dequeue(MessageInstance entry);

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java Tue Mar  3 14:56:40 2015
@@ -24,7 +24,7 @@ import java.util.Collection;
 
 import org.apache.qpid.server.store.DurableConfigurationStore;
 
-@ManagedObject(category=true, managesChildren=false)
+@ManagedObject(category=true, managesChildren=true)
 public interface VirtualHostNode<X extends VirtualHostNode<X>> extends ConfiguredObject<X>
 {
     String QPID_INITIAL_CONFIG_VIRTUALHOST_CONFIG_VAR = "qpid.initial_config_virtualhost_config";

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Tue Mar  3 14:56:40 2015
@@ -80,7 +80,7 @@ public final class ConnectionAdapter ext
     {
         Map<String,Object> attributes = new HashMap<String, Object>();
         attributes.put(ID, UUID.randomUUID());
-        attributes.put(NAME, _connection.getRemoteAddressString().replaceAll("/", ""));
+        attributes.put(NAME, "[" + _connection.getConnectionId() + "] " + _connection.getRemoteAddressString().replaceAll("/", ""));
         attributes.put(DURABLE, false);
         return attributes;
     }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java Tue Mar  3 14:56:40 2015
@@ -25,7 +25,7 @@ import org.apache.qpid.server.model.Grou
 import org.apache.qpid.server.model.ManagedAttribute;
 import org.apache.qpid.server.model.ManagedObject;
 
-@ManagedObject( category = false, type = "GroupFile" )
+@ManagedObject( category = false, type = "GroupFile", managesChildren = true )
 public interface FileBasedGroupProvider<X extends FileBasedGroupProvider<X>> extends GroupProvider<X>, GroupManagingGroupProvider
 {
     String PATH="path";

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java Tue Mar  3 14:56:40 2015
@@ -34,6 +34,7 @@ import java.util.UUID;
 
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Broker;
@@ -50,6 +51,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
 import org.apache.qpid.server.security.group.FileGroupDatabase;
 import org.apache.qpid.server.security.group.GroupPrincipal;
+import org.apache.qpid.server.util.FileHelper;
 
 public class FileBasedGroupProviderImpl
         extends AbstractConfiguredObject<FileBasedGroupProviderImpl> implements FileBasedGroupProvider<FileBasedGroupProviderImpl>
@@ -162,9 +164,11 @@ public class FileBasedGroupProviderImpl
             {
                 throw new IllegalConfigurationException(String.format("Cannot create groups file at '%s'",_path));
             }
+
             try
             {
-                file.createNewFile();
+                String posixFileAttributes = getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS);
+                new FileHelper().createNewFile(file, posixFileAttributes);
             }
             catch (IOException e)
             {

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java Tue Mar  3 14:56:40 2015
@@ -21,14 +21,14 @@
 
 package org.apache.qpid.server.model.adapter;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -38,6 +38,9 @@ import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.JsonProcessingException;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -118,7 +121,7 @@ public class FileSystemPreferencesProvid
         FileSystemPreferencesStore store = new FileSystemPreferencesStore(new File(_path));
 
         // we need to check and create file if it does not exist every time on open
-        store.createIfNotExist();
+        store.createIfNotExist(getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS));
         store.open();
         _store = store;
         _open = true;
@@ -184,6 +187,7 @@ public class FileSystemPreferencesProvid
 
         if(_store != null)
         {
+            _store.close();
             _store.delete();
             deleted();
             _authenticationProvider.setPreferencesProvider(null);
@@ -280,7 +284,7 @@ public class FileSystemPreferencesProvid
             else
             {
                 FileSystemPreferencesStore store = new FileSystemPreferencesStore(new File(_path));
-                store.createIfNotExist();
+                store.createIfNotExist(getContextValue(String.class, BrokerProperties.POSIX_FILE_PERMISSIONS));
                 store.open();
                 _store = store;
             }
@@ -334,9 +338,9 @@ public class FileSystemPreferencesProvid
     {
         private final ObjectMapper _objectMapper;
         private final Map<String, Map<String, Object>> _preferences;
+        private final FileHelper _fileHelper;
         private File _storeFile;
         private FileLock _storeLock;
-        private RandomAccessFile _storeRAF;
 
         public FileSystemPreferencesStore(File preferencesFile)
         {
@@ -345,9 +349,10 @@ public class FileSystemPreferencesProvid
             _objectMapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
             _objectMapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
             _preferences = new TreeMap<String, Map<String, Object>>();
+            _fileHelper = new FileHelper();
         }
 
-        public void createIfNotExist()
+        public void createIfNotExist(String filePermissions)
         {
             if (!_storeFile.exists())
             {
@@ -358,7 +363,8 @@ public class FileSystemPreferencesProvid
                 }
                 try
                 {
-                    if (_storeFile.createNewFile() && !_storeFile.exists())
+                    Path path = _fileHelper.createNewFile(_storeFile, filePermissions);
+                    if (!Files.exists(path))
                     {
                         throw new IllegalConfigurationException(String.format("Cannot create preferences store file at '%s'", _storeFile.getAbsolutePath()));
                     }
@@ -391,43 +397,20 @@ public class FileSystemPreferencesProvid
             }
             try
             {
-                _storeRAF = new RandomAccessFile(_storeFile, "rw");
-                FileChannel fileChannel = _storeRAF.getChannel();
-                try
-                {
-                    _storeLock = fileChannel.tryLock();
-                }
-                catch (OverlappingFileLockException e)
-                {
-                    _storeLock = null;
-                }
-                if (_storeLock == null)
+                getFileLock(_storeFile.getPath() + ".lck");
+                if (_storeFile.length() > 0)
                 {
-                    throw new IllegalConfigurationException("Cannot get lock on store file " + _storeFile.getName()
-                            + " is another instance running?");
-                }
-                long fileSize = fileChannel.size();
-                if (fileSize > 0)
-                {
-                    ByteBuffer buffer = ByteBuffer.allocate((int) fileSize);
-                    fileChannel.read(buffer);
-                    buffer.rewind();
-                    buffer.flip();
-                    byte[] data = buffer.array();
-                    try
-                    {
-                        Map<String, Map<String, Object>> preferencesMap = _objectMapper.readValue(data,
-                                new TypeReference<Map<String, Map<String, Object>>>()
-                                {
-                                });
-                        _preferences.putAll(preferencesMap);
-                    }
-                    catch (JsonProcessingException e)
-                    {
-                        throw new IllegalConfigurationException("Cannot parse preferences json in " + _storeFile.getName(), e);
-                    }
+                    Map<String, Map<String, Object>> preferencesMap = _objectMapper.readValue(_storeFile,
+                            new TypeReference<Map<String, Map<String, Object>>>()
+                            {
+                            });
+                    _preferences.putAll(preferencesMap);
                 }
             }
+            catch (JsonProcessingException e)
+            {
+                throw new IllegalConfigurationException("Cannot parse preferences json in " + _storeFile.getName(), e);
+            }
             catch (IOException e)
             {
                 throw new IllegalConfigurationException("Cannot load preferences from " + _storeFile.getName(), e);
@@ -443,6 +426,7 @@ public class FileSystemPreferencesProvid
                     if (_storeLock != null)
                     {
                         _storeLock.release();
+                        _storeLock.channel().close();
                     }
                 }
                 catch (IOException e)
@@ -452,22 +436,7 @@ public class FileSystemPreferencesProvid
                 finally
                 {
                     _storeLock = null;
-                    try
-                    {
-                        if (_storeRAF != null)
-                        {
-                            _storeRAF.close();
-                        }
-                    }
-                    catch (IOException e)
-                    {
-                        LOGGER.error("Cannot close preferences file", e);
-                    }
-                    finally
-                    {
-                        _storeRAF = null;
-                        _preferences.clear();
-                    }
+                    _preferences.clear();
                 }
             }
         }
@@ -544,16 +513,14 @@ public class FileSystemPreferencesProvid
             checkStoreOpened();
             try
             {
-                ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                _objectMapper.writeValue(baos, _preferences);
-                FileChannel channel = _storeRAF.getChannel();
-                long currentSize = channel.size();
-                channel.position(0);
-                channel.write(ByteBuffer.wrap(baos.toByteArray()));
-                if (currentSize > baos.size())
+                _fileHelper.writeFileSafely(_storeFile.toPath(), new BaseAction<File, IOException>()
                 {
-                    channel.truncate(baos.size());
-                }
+                    @Override
+                    public void performAction(File file) throws IOException
+                    {
+                        _objectMapper.writeValue(file, _preferences);
+                    }
+                });
             }
             catch (IOException e)
             {
@@ -569,5 +536,32 @@ public class FileSystemPreferencesProvid
             }
         }
 
+        private void getFileLock(String lockFilePath)
+        {
+            File lockFile = new File(lockFilePath);
+            try
+            {
+                lockFile.createNewFile();
+                lockFile.deleteOnExit();
+
+                @SuppressWarnings("resource")
+                FileOutputStream out = new FileOutputStream(lockFile);
+                FileChannel channel = out.getChannel();
+                _storeLock = channel.tryLock();
+            }
+            catch (IOException ioe)
+            {
+                throw new IllegalStateException("Cannot create the lock file " + lockFile.getName(), ioe);
+            }
+            catch(OverlappingFileLockException e)
+            {
+                _storeLock = null;
+            }
+
+            if(_storeLock == null)
+            {
+                throw new IllegalStateException("Cannot get lock on file " + lockFile.getAbsolutePath() + ". Is another instance running?");
+            }
+        }
     }
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/QpidServiceLoader.java Tue Mar  3 14:56:40 2015
@@ -19,8 +19,11 @@
 package org.apache.qpid.server.plugin;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.ServiceLoader;
 
 import org.apache.log4j.Logger;
@@ -47,6 +50,16 @@ public class QpidServiceLoader
         return instancesOf(clazz, true);
     }
 
+    public <C extends Pluggable> Map<String,C> getInstancesByType(Class<C> clazz)
+    {
+        Map<String,C> instances = new HashMap<>();
+        for(C instance : instancesOf(clazz))
+        {
+            instances.put(instance.getType(), instance);
+        }
+        return Collections.unmodifiableMap(instances);
+    }
+
     private <C extends Pluggable> Iterable<C> instancesOf(Class<C> clazz, boolean atLeastOne)
     {
         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Mar  3 14:56:40 2015
@@ -30,7 +30,9 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
@@ -52,6 +54,7 @@ import org.apache.qpid.server.consumer.C
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
@@ -75,6 +78,8 @@ import org.apache.qpid.server.model.Queu
 import org.apache.qpid.server.model.QueueNotificationListener;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.plugin.MessageFilterFactory;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.SecurityManager;
@@ -186,6 +191,9 @@ public abstract class AbstractQueue<X ex
     @ManagedAttributeField
     private MessageDurability _messageDurability;
 
+    @ManagedAttributeField
+    private Map<String, Map<String,List<String>>> _defaultFilters;
+
     private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name
 
     private final Set<NotificationCheck> _notificationChecks =
@@ -241,12 +249,15 @@ public abstract class AbstractQueue<X ex
     private long _minimumMessageTtl;
     @ManagedAttributeField
     private long _maximumMessageTtl;
+    @ManagedAttributeField
+    private boolean _ensureNondestructiveConsumers;
 
     private final AtomicBoolean _recovering = new AtomicBoolean(true);
     private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
 
     private final QueueRunner _queueRunner = new QueueRunner(this);
     private boolean _closing;
+    private final ConcurrentMap<String,MessageFilter> _defaultFiltersMap = new ConcurrentHashMap<>();
 
     protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
     {
@@ -283,11 +294,7 @@ public abstract class AbstractQueue<X ex
                          });
         }
 
-        if (isDurable())
-        {
-            _virtualHost.getDurableConfigurationStore().create(asObjectRecord());
-        }
-        else if(getMessageDurability() != MessageDurability.NEVER)
+        if(!isDurable() && getMessageDurability() != MessageDurability.NEVER)
         {
             Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
                          new PrivilegedAction<Object>()
@@ -351,17 +358,9 @@ public abstract class AbstractQueue<X ex
 
                 case PRINCIPAL:
                     _exclusiveOwner = sessionModel.getConnectionModel().getAuthorizedPrincipal();
-                    if(isDurable())
-                    {
-                        _virtualHost.getDurableConfigurationStore().update(false,asObjectRecord());
-                    }
                     break;
                 case CONTAINER:
                     _exclusiveOwner = sessionModel.getConnectionModel().getRemoteContainerName();
-                    if(isDurable())
-                    {
-                        _virtualHost.getDurableConfigurationStore().update(false,asObjectRecord());
-                    }
                     break;
                 case CONNECTION:
                     _exclusiveOwner = sessionModel.getConnectionModel();
@@ -450,6 +449,40 @@ public abstract class AbstractQueue<X ex
         }
 
         _maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES);
+
+        if(_defaultFilters != null)
+        {
+            QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
+            final Map<String, MessageFilterFactory> messageFilterFactories =
+                    qpidServiceLoader.getInstancesByType(MessageFilterFactory.class);
+
+            for (Map.Entry<String,Map<String,List<String>>> entry : _defaultFilters.entrySet())
+            {
+                String name = String.valueOf(entry.getKey());
+                Map<String, List<String>> filterValue = entry.getValue();
+                if(filterValue.size() == 1)
+                {
+                    String filterTypeName = String.valueOf(filterValue.keySet().iterator().next());
+                    MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName);
+                    if(filterFactory != null)
+                    {
+                        List<String> filterArguments = filterValue.values().iterator().next();
+                        _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments));
+                    }
+                    else
+                    {
+                        throw new IllegalArgumentException("Unknown filter type " + filterTypeName + ", known types are: " + messageFilterFactories.keySet());
+                    }
+                }
+                else
+                {
+                    throw new IllegalArgumentException("Filter value should be a map with one entry, having the type as key and the value being the filter arguments, not " + filterValue);
+
+                }
+
+            }
+        }
+
         updateAlertChecks();
     }
 
@@ -555,6 +588,12 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
+    public Map<String, Map<String, List<String>>> getDefaultFilters()
+    {
+        return _defaultFilters;
+    }
+
+    @Override
     public final MessageDurability getMessageDurability()
     {
         return _messageDurability;
@@ -573,6 +612,14 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
+    public boolean isEnsureNondestructiveConsumers()
+    {
+        return _ensureNondestructiveConsumers;
+    }
+
+
+
+    @Override
     public Collection<String> getAvailableAttributes()
     {
         return new ArrayList<String>(_arguments.keySet());
@@ -603,7 +650,7 @@ public abstract class AbstractQueue<X ex
 
     @Override
     public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target,
-                                     final FilterManager filters,
+                                     FilterManager filters,
                                      final Class<? extends ServerMessage> messageClass,
                                      final String consumerName,
                                      EnumSet<ConsumerImpl.Option> optionSet)
@@ -699,6 +746,26 @@ public abstract class AbstractQueue<X ex
         {
             throw new ExistingConsumerPreventsExclusive();
         }
+        if(!_defaultFiltersMap.isEmpty())
+        {
+            if(filters == null)
+            {
+                filters = new FilterManager();
+            }
+            for (Map.Entry<String,MessageFilter> filter : _defaultFiltersMap.entrySet())
+            {
+                if(!filters.hasFilter(filter.getKey()))
+                {
+                    filters.add(filter.getKey(), filter.getValue());
+                }
+            }
+        }
+
+        if(_ensureNondestructiveConsumers)
+        {
+            optionSet = EnumSet.copyOf(optionSet);
+            optionSet.removeAll(EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES));
+        }
 
         QueueConsumerImpl consumer = new QueueConsumerImpl(this,
                                                            target,

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Tue Mar  3 14:56:40 2015
@@ -62,11 +62,16 @@ public class QueueArgumentsConverter
 
     public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
 
+    public static final String QPID_DEFAULT_FILTERS = "qpid.default_filters";
+
+    public static final String QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS = "qpid.ensure_nondestructive_consumers";
     /**
      * No-local queue argument is used to support the no-local feature of Durable Subscribers.
      */
     public static final String QPID_NO_LOCAL = "no-local";
+
     static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>();
+
     static
     {
         ATTRIBUTE_MAPPINGS.put(X_QPID_MINIMUM_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP);
@@ -99,6 +104,8 @@ public class QueueArgumentsConverter
 
         ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL);
         ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY);
+        ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_FILTERS, Queue.DEFAULT_FILTERS);
+        ATTRIBUTE_MAPPINGS.put(QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS, Queue.ENSURE_NONDESTRUCTIVE_CONSUMERS);
 
     }
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Mar  3 14:56:40 2015
@@ -371,11 +371,16 @@ public abstract class QueueEntryImpl imp
         }
     }
 
-    private void dequeue()
+    private boolean dequeue()
     {
         EntryState state = _state;
 
-        if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
+        while(state.getState() == State.ACQUIRED && !_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
+        {
+            state = _state;
+        }
+
+        if(state.getState() == State.ACQUIRED)
         {
             if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
             {
@@ -387,7 +392,11 @@ public abstract class QueueEntryImpl imp
             {
                 notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
             }
-
+            return true;
+        }
+        else
+        {
+            return false;
         }
 
     }
@@ -420,9 +429,10 @@ public abstract class QueueEntryImpl imp
 
     public void delete()
     {
-        dequeue();
-
-        dispose();
+        if(dequeue())
+        {
+            dispose();
+        }
     }
 
     public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn)

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStore.java Tue Mar  3 14:56:40 2015
@@ -62,7 +62,7 @@ public interface FileKeyStore<X extends
     @ManagedAttribute(defaultValue = "${this:path}")
     String getDescription();
 
-    @ManagedAttribute(  mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT)
+    @ManagedAttribute(  mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT, secureValueFilter = "^data\\:.*")
     String getStoreUrl();
 
     @DerivedAttribute

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java?rev=1663717&r1=1663716&r2=1663717&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStore.java Tue Mar  3 14:56:40 2015
@@ -31,7 +31,7 @@ public interface NonJavaKeyStore<X exten
     @ManagedAttribute(defaultValue = "${this:subjectName}")
     String getDescription();
 
-    @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT )
+    @ManagedAttribute( mandatory = true, secure = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT, secureValueFilter = "^data\\:.*")
     String getPrivateKeyUrl();
 
     @ManagedAttribute( mandatory = true, oversize = true, oversizedAltText = OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT )



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message