qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject svn commit: r1634713 - in /qpid/trunk/qpid/java: bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/store/ broker-core/src/test/java/org/apache/qpid/server/store/
Date Mon, 27 Oct 2014 22:32:30 GMT
Author: orudyy
Date: Mon Oct 27 22:32:29 2014
New Revision: 1634713

URL: http://svn.apache.org/r1634713
Log:
QPID-5650: Preserve alternate exchange on upgrade of queue with 'dead letter queue'

Added:
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
Modified:
    qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java

Modified: qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java?rev=1634713&r1=1634712&r2=1634713&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
Mon Oct 27 22:32:29 2014
@@ -360,9 +360,6 @@ public class BDBUpgradeTest extends Qpid
     }
 
     /**
-     *
-     * TODO (QPID-5650) Resolve so this test can be reenabled.
-     *
      * Test that the queue configured to have a DLQ was recovered and has the alternate exchange
      * and max delivery count, the DLE exists, the DLQ exists with no max delivery count,
the
      * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ.
@@ -370,7 +367,7 @@ public class BDBUpgradeTest extends Qpid
      * DLQs are NOT enabled at the virtualhost level, we are testing recovery of the arguments
      * that turned it on for this specific queue.
      */
-    public void xtestRecoveryOfQueueWithDLQ() throws Exception
+    public void testRecoveryOfQueueWithDLQ() throws Exception
     {
         JMXTestUtils jmxUtils = null;
         try

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1634713&r1=1634712&r2=1634713&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
(original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
Mon Oct 27 22:32:29 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.store;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -28,6 +29,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.UUID;
 
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.filter.FilterSupport;
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.Exchange;
@@ -36,6 +39,7 @@ import org.apache.qpid.server.model.UUID
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class VirtualHostStoreUpgraderAndRecoverer
 {
@@ -346,6 +350,11 @@ public class VirtualHostStoreUpgraderAnd
 
     private class Upgrader_0_4_to_2_0 extends StoreUpgraderPhase
     {
+        private static final String ARGUMENTS = "arguments";
+        private static final String DLQ_ENABLED_ARGUMENT = "x-qpid-dlq-enabled";
+        private static final String ALTERNATE_EXCHANGE = "alternateExchange";
+        private static final String VIRTUAL_HOST_DLQ_ENABLED = "queue.deadLetterQueueEnabled";
+
         private Map<String, String> _missingAmqpExchanges = new HashMap<String,
String>(DEFAULT_EXCHANGES);
         private ConfiguredObjectRecord _virtualHostRecord;
 
@@ -372,12 +381,24 @@ public class VirtualHostStoreUpgraderAnd
                 String name = (String)attributes.get("name");
                 _missingAmqpExchanges.remove(name);
             }
-            getNextUpgrader().configuredObject(record);
+            getUpdateMap().put(record.getId(), record);
         }
 
         @Override
         public void complete()
         {
+            boolean virtualHostDLQEnabled =  Boolean.parseBoolean(String.valueOf(_virtualHostRecord.getAttributes().get(VIRTUAL_HOST_DLQ_ENABLED)));
+            for (Iterator<Map.Entry<UUID, ConfiguredObjectRecord>> iterator =
getUpdateMap().entrySet().iterator(); iterator.hasNext();)
+            {
+                Map.Entry<UUID, ConfiguredObjectRecord> entry = iterator.next();
+                ConfiguredObjectRecord record = entry.getValue();
+                if ("Queue".equals(record.getType()))
+                {
+                    record = upgradeQueueRecordIfNecessary(record, virtualHostDLQEnabled);
+                }
+                getNextUpgrader().configuredObject(record);
+            }
+
             for (Entry<String, String> entry : _missingAmqpExchanges.entrySet())
             {
                 String name = entry.getKey();
@@ -399,6 +420,54 @@ public class VirtualHostStoreUpgraderAnd
             getNextUpgrader().complete();
         }
 
+        private ConfiguredObjectRecord upgradeQueueRecordIfNecessary(ConfiguredObjectRecord
record, boolean _virtualHostDLQEnabled)
+        {
+            Map<String, Object> attributes = new LinkedHashMap<>(record.getAttributes());
+            boolean queueArgumentDQLEnabledSet = false;
+            boolean queueDLQEnabled = false;
+
+            if (attributes.get(ARGUMENTS) instanceof Map)
+            {
+                Map<String,Object> arguments = (Map<String,Object>)attributes.get(ARGUMENTS);
+                queueArgumentDQLEnabledSet = arguments.containsKey(DLQ_ENABLED_ARGUMENT);
+                queueDLQEnabled = queueArgumentDQLEnabledSet ? Boolean.parseBoolean(String.valueOf(arguments.get(DLQ_ENABLED_ARGUMENT)))
: false;
+            }
+
+            if( ((queueArgumentDQLEnabledSet && queueDLQEnabled) || (!queueArgumentDQLEnabledSet
&& _virtualHostDLQEnabled )) && attributes.get("alternateExchange") == null)
+            {
+                Object queueName =  attributes.get("name");
+
+                if (queueName == null || "".equals(queueName))
+                {
+                    throw new IllegalConfigurationException("Queue name is not found in queue
configuration entry attributes: " + attributes);
+                }
+
+                String dleSuffix = System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX,
VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX);
+                ConfiguredObjectRecord alternateExchange = findConfiguredObjectRecord("Exchange",
queueName + dleSuffix);
+
+                if (alternateExchange != null)
+                {
+                    attributes.put(ALTERNATE_EXCHANGE, alternateExchange.getId());
+                    record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(),
attributes, record.getParents());
+                    getUpdateMap().put(record.getId(), record);
+                }
+            }
+            return record;
+        }
+
+        private ConfiguredObjectRecord findConfiguredObjectRecord(String type, String name)
+        {
+            Collection<ConfiguredObjectRecord> records = getUpdatedRecords().values();
+            for(ConfiguredObjectRecord record: records)
+            {
+                if (type.equals(record.getType()) && name.equals(record.getAttributes().get("name")))
+                {
+                    return record;
+                }
+            }
+            return null;
+        }
+
     }
 
     private class Upgrader_2_0_to_2_1 extends StoreUpgraderPhase

Added: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java?rev=1634713&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
(added)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
Mon Oct 27 22:32:29 2014
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import static java.util.Arrays.asList;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.SystemConfig;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
+import org.apache.qpid.server.virtualhostnode.TestVirtualHostNode;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase
+{
+    private ConfiguredObjectRecord _hostRecord;
+    private CurrentThreadTaskExecutor _taskExecutor;
+    private UUID _hostId;
+    private VirtualHostNode _virtualHostNode;
+    private DurableConfigurationStore _durableConfigurationStore;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+
+        UUID hostParentId = UUID.randomUUID();
+        _hostId = UUID.randomUUID();
+        Map<String, Object> hostAttributes = new HashMap<>();
+        hostAttributes.put("modelVersion", "0.0");
+        hostAttributes.put("name", "test");
+        hostAttributes.put("type", TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
+
+        _hostRecord = mock(ConfiguredObjectRecord.class);
+        when(_hostRecord.getId()).thenReturn(_hostId);
+        when(_hostRecord.getAttributes()).thenReturn(hostAttributes);
+        when(_hostRecord.getType()).thenReturn("VirtualHost");
+        when(_hostRecord.toString()).thenReturn("VirtualHost[name='test',id='" + _hostId
+ "']");
+
+        _taskExecutor = new CurrentThreadTaskExecutor();
+        _taskExecutor.start();
+
+        SystemConfig<?> systemConfig = mock(SystemConfig.class);
+        when(systemConfig.getEventLogger()).thenReturn(new EventLogger());
+
+        Broker<?> broker = mock(Broker.class);
+        when(broker.getParent(SystemConfig.class)).thenReturn(systemConfig);
+        when(broker.getTaskExecutor()).thenReturn(_taskExecutor);
+        when(broker.getModel()).thenReturn(BrokerModel.getInstance());
+
+        _durableConfigurationStore = mock(DurableConfigurationStore.class);
+        Map<String,Object> attributes = new HashMap<>();
+        attributes.put(VirtualHostNode.ID, hostParentId);
+        attributes.put(VirtualHostNode.NAME, "test");
+        _virtualHostNode = new TestVirtualHostNode(broker, attributes, _durableConfigurationStore);
+    }
+
+    @Override
+    public void tearDown()throws Exception
+    {
+        super.tearDown();
+        _taskExecutor.stopImmediately();
+    }
+
+    public void testRecoverQueueWithDLQEnabled() throws Exception
+    {
+        ConfiguredObjectRecord queue = mockQueue("test", Collections.<String,Object>singletonMap("x-qpid-dlq-enabled",
"true"));
+        ConfiguredObjectRecord dlq = mockQueue("test_DLQ", Collections.<String,Object>singletonMap("x-qpid-dlq-enabled",
"false"));
+        ConfiguredObjectRecord dle = mockExchange("test_DLE", "fanout");
+        ConfiguredObjectRecord dlqBinding = mockBinding("dlq", dlq, dle);
+        ConfiguredObjectRecord directExchange = mock(ConfiguredObjectRecord.class);
+        when(directExchange.getId()).thenReturn(UUIDGenerator.generateExchangeUUID("amq.direct",
"test"));
+        ConfiguredObjectRecord queueBinding =  mockBinding("test", queue, directExchange);
+        setUpVisit(_hostRecord, queue, dlq, dle, queueBinding, dlqBinding);
+
+        VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
+        upgraderAndRecoverer.perform(_durableConfigurationStore);
+
+        VirtualHost<?,?,?>  host = _virtualHostNode.getVirtualHost();
+        host.open();
+
+        assertNotNull("Virtual host is not recovered", host);
+        Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, "test");
+        assertNotNull("Queue is not recovered", recoveredQueue);
+
+        Queue<?> recoveredDLQ = host.findConfiguredObject(Queue.class, "test_DLQ");
+        assertNotNull("DLQ queue is not recovered", recoveredDLQ);
+
+        Exchange<?> recoveredDLE = host.findConfiguredObject(Exchange.class, "test_DLE");
+        assertNotNull("DLE exchange is not recovered", recoveredDLE);
+
+        assertEquals("Unexpected alternative exchange", recoveredDLE, recoveredQueue.getAlternateExchange());
+    }
+
+    public void testRecoverQueueWithDLQEnabledOnVirtualHost() throws Exception
+    {
+        _hostRecord.getAttributes().put(VirtualHost.QUEUE_DEAD_LETTER_QUEUE_ENABLED, "true");
+
+        ConfiguredObjectRecord queue = mockQueue("test", null);
+        ConfiguredObjectRecord dlq = mockQueue("test_DLQ", Collections.<String,Object>singletonMap("x-qpid-dlq-enabled",
"false"));
+        ConfiguredObjectRecord dle = mockExchange("test_DLE", "fanout");
+        ConfiguredObjectRecord dlqBinding = mockBinding("dlq", dlq, dle);
+        ConfiguredObjectRecord directExchange = mock(ConfiguredObjectRecord.class);
+        when(directExchange.getId()).thenReturn(UUIDGenerator.generateExchangeUUID("amq.direct",
"test"));
+        ConfiguredObjectRecord queueBinding =  mockBinding("test", queue, directExchange);
+        setUpVisit(_hostRecord, queue, dlq, dle, queueBinding, dlqBinding);
+
+        VirtualHostStoreUpgraderAndRecoverer upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
+        upgraderAndRecoverer.perform(_durableConfigurationStore);
+
+        VirtualHost<?,?,?>  host = _virtualHostNode.getVirtualHost();
+        host.open();
+
+        assertNotNull("Virtual host is not recovered", host);
+        Queue<?> recoveredQueue = host.findConfiguredObject(Queue.class, "test");
+        assertNotNull("Queue is not recovered", recoveredQueue);
+
+        Queue<?> recoveredDLQ = host.findConfiguredObject(Queue.class, "test_DLQ");
+        assertNotNull("DLQ queue is not recovered", recoveredDLQ);
+
+        Exchange<?> recoveredDLE = host.findConfiguredObject(Exchange.class, "test_DLE");
+        assertNotNull("DLE exchange is not recovered", recoveredDLE);
+
+        assertEquals("Unexpected alternative exchange", recoveredDLE, recoveredQueue.getAlternateExchange());
+    }
+
+    private ConfiguredObjectRecord mockBinding(String bindingName, ConfiguredObjectRecord
queue, ConfiguredObjectRecord exchange)
+    {
+        ConfiguredObjectRecord binding = mock(ConfiguredObjectRecord.class);
+        when(binding.getId()).thenReturn(UUID.randomUUID());
+        when(binding.getType()).thenReturn("org.apache.qpid.server.model.Binding");
+        Map<String,UUID> parents = new HashMap<>();
+        parents.put("Queue", queue.getId());
+        parents.put("Exchange", exchange.getId());
+        when(binding.getParents()).thenReturn(parents);
+        when(binding.toString()).thenReturn("Binding[" + bindingName + "]");
+
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put("durable", true);
+        attributes.put("name", bindingName);
+        when(binding.getAttributes()).thenReturn(attributes);
+        return binding;
+    }
+
+    private ConfiguredObjectRecord mockExchange(String exchangeName, String exchangeType)
+    {
+        ConfiguredObjectRecord exchange = mock(ConfiguredObjectRecord.class);
+        when(exchange.getId()).thenReturn(UUID.randomUUID());
+        when(exchange.getType()).thenReturn("org.apache.qpid.server.model.Exchange");
+        when(exchange.getParents()).thenReturn(Collections.singletonMap("VirtualHost", _hostId));
+        when(exchange.toString()).thenReturn("Exchange[" + exchangeName + "]");
+
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put("type", exchangeType);
+        attributes.put("durable", true);
+        attributes.put("name", exchangeName);
+        when(exchange.getAttributes()).thenReturn(attributes);
+        return exchange;
+    }
+
+    private ConfiguredObjectRecord mockQueue(String queueName, Map<String, Object>
arguments)
+    {
+        ConfiguredObjectRecord queue = mock(ConfiguredObjectRecord.class);
+        when(queue.getId()).thenReturn(UUID.randomUUID());
+        when(queue.getType()).thenReturn("org.apache.qpid.server.model.Queue");
+        when(queue.getParents()).thenReturn(Collections.singletonMap("VirtualHost", _hostId));
+        when(queue.toString()).thenReturn("Queue[" + queueName + "]");
+
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put("durable", true);
+        attributes.put("name", queueName);
+        if (arguments != null)
+        {
+            attributes.put("arguments", arguments);
+        }
+        when(queue.getAttributes()).thenReturn(attributes);
+        return queue;
+    }
+
+
+    private void setUpVisit(final ConfiguredObjectRecord... records)
+    {
+        doAnswer(new Answer()
+        {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable
+            {
+                Iterator<ConfiguredObjectRecord> iterator = asList(records).iterator();
+                ConfiguredObjectRecordHandler handler = (ConfiguredObjectRecordHandler) invocation.getArguments()[0];
+                handler.begin();
+                boolean handlerContinue = true;
+                while(iterator.hasNext() && handlerContinue)
+                {
+                    handlerContinue = handler.handle(iterator.next());
+                }
+                handler.end();
+                return null;
+            }
+        }).when(_durableConfigurationStore).visitConfiguredObjectRecords(any(ConfiguredObjectRecordHandler.class));
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message