qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lqu...@apache.org
Subject svn commit: r1714011 - in /qpid/java/trunk/broker-core/src: main/java/org/apache/qpid/server/security/ main/java/org/apache/qpid/server/virtualhost/ test/java/org/apache/qpid/server/security/ test/java/org/apache/qpid/server/virtualhost/
Date Thu, 12 Nov 2015 10:46:22 GMT
Author: lquack
Date: Thu Nov 12 10:46:22 2015
New Revision: 1714011

URL: http://svn.apache.org/viewvc?rev=1714011&view=rev
Log:
QPID-6829: [Java Broker] Implement AbstractSystemMessageSource$Consumer#flush()

Added:
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java?rev=1714011&r1=1714010&r2=1714011&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
Thu Nov 12 10:46:22 2015
@@ -48,7 +48,6 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.model.TrustStore;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.virtualhost.AbstractSystemMessageSource;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class TrustStoreMessageSource extends AbstractSystemMessageSource implements MessageSource
 {
@@ -59,7 +58,7 @@ public class TrustStoreMessageSource ext
     private final VirtualHost<?, ?, ?> _virtualHost;
 
 
-    public TrustStoreMessageSource(final TrustStore<?> trustStore, final VirtualHostImpl
virtualHost)
+    public TrustStoreMessageSource(final TrustStore<?> trustStore, final VirtualHost<?,
?, ?> virtualHost)
     {
         super(getSourceNameFromTrustStore(trustStore), virtualHost);
         _virtualHost = virtualHost;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1714011&r1=1714010&r2=1714011&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
Thu Nov 12 10:46:22 2015
@@ -43,6 +43,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -55,7 +56,7 @@ public abstract class AbstractSystemMess
     private List<Consumer> _consumers = new CopyOnWriteArrayList<>();
 
     public AbstractSystemMessageSource(
-            String name, final VirtualHostImpl virtualHost)
+            String name, final VirtualHost<?, ?, ?> virtualHost)
     {
         _name = name;
         _id = UUID.nameUUIDFromBytes((getClass().getSimpleName() + "/" + virtualHost.getName()
+ "/" + name).getBytes(
@@ -244,7 +245,17 @@ public abstract class AbstractSystemMess
         @Override
         public void flush()
         {
-
+            AMQPConnection<?> connection = getSessionModel().getAMQPConnection();
+            try
+            {
+                connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
+                deliverMessages();
+                _target.processPending();
+            }
+            finally
+            {
+                connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
+            }
         }
 
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java?rev=1714011&r1=1714010&r2=1714011&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
Thu Nov 12 10:46:22 2015
@@ -32,15 +32,16 @@ import org.apache.qpid.server.filter.Fil
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.model.VirtualHost;
 
 public class VirtualHostPropertiesNode extends AbstractSystemMessageSource
 {
 
-    public VirtualHostPropertiesNode(final VirtualHostImpl virtualHost)
+    public VirtualHostPropertiesNode(final VirtualHost<?, ?, ?> virtualHost)
     {
         this(virtualHost, "$virtualhostProperties");
     }
-    public VirtualHostPropertiesNode(final VirtualHostImpl virtualHost, String name)
+    public VirtualHostPropertiesNode(final VirtualHost<?, ?, ?> virtualHost, String
name)
     {
         super(name, virtualHost);
     }

Added: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java?rev=1714011&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
(added)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
Thu Nov 12 10:46:22 2015
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.security;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.nio.ByteBuffer;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateEncodingException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.mockito.ArgumentCaptor;
+
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.TrustStore;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+
+public class TrustStoreMessageSourceTest extends QpidTestCase
+{
+    private TrustStoreMessageSource _trustStoreMessageSource;
+    private Certificate[] _certificates;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        VirtualHost vhost = mock(VirtualHost.class);
+        MessageStore messageStore = new TestMemoryMessageStore();
+        TrustStore trustStore = mock(TrustStore.class);
+        Certificate certificate = mock(Certificate.class);
+        _certificates = new Certificate[]{certificate};
+
+        when(vhost.getMessageStore()).thenReturn(messageStore);
+        when(trustStore.getState()).thenReturn(State.ACTIVE);
+        when(trustStore.getCertificates()).thenReturn(_certificates);
+        when(certificate.getEncoded()).thenReturn("my certificate".getBytes());
+        _trustStoreMessageSource= new TrustStoreMessageSource(trustStore, vhost);
+    }
+
+    public void testAddConsumer() throws Exception
+    {
+        final EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+        final ConsumerTarget target = mock(ConsumerTarget.class);
+        when(target.allocateCredit(any(ServerMessage.class))).thenReturn(true);
+
+        _trustStoreMessageSource.addConsumer(target, null, ServerMessage.class, getTestName(),
options);
+
+        ArgumentCaptor<MessageInstance> argumentCaptor = ArgumentCaptor.forClass(MessageInstance.class);
+        verify(target).send(any(ConsumerImpl.class), argumentCaptor.capture(), anyBoolean());
+        final ServerMessage message = argumentCaptor.getValue().getMessage();
+        assertCertificates(getCertificatesFromMessage(message));
+    }
+
+    private void assertCertificates(final List<String> encodedCertificates) throws
CertificateEncodingException
+    {
+        for (int i = 0; i < _certificates.length; ++i)
+        {
+            assertArrayEquals("Unexpected content", _certificates[i].getEncoded(), encodedCertificates.get(i).getBytes());
+        }
+    }
+
+    private List<String> getCertificatesFromMessage(final ServerMessage message) throws
ClassNotFoundException
+    {
+        final int bodySize = (int) message.getSize();
+        byte[] msgContent = new byte[bodySize];
+        ByteBuffer buf = ByteBuffer.wrap(msgContent);
+        int stored = message.getContent(buf);
+        assertEquals("Unexpected message size was retrieved", bodySize, stored);
+
+        List<String> certificates = new ArrayList<>();
+        ByteArrayInputStream bytesIn = new ByteArrayInputStream(msgContent);
+        try (ObjectInputStream is = new ObjectInputStream(bytesIn))
+        {
+            ArrayList<byte[]> encodedCertificates = (ArrayList<byte[]>) is.readObject();
+            for(byte[] encodedCertificate : encodedCertificates)
+            {
+                certificates.add(new String(encodedCertificate));
+            }
+            is.close();
+        }
+        catch (IOException e)
+        {
+            fail("Unexpected IO Exception on operation: " + e.getMessage());
+        }
+        return certificates;
+    }
+}

Added: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java?rev=1714011&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
(added)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
Thu Nov 12 10:46:22 2015
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.virtualhost;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.EnumSet;
+
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class VirtualHostPropertiesNodeTest extends QpidTestCase
+{
+    private VirtualHostPropertiesNode _virtualHostPropertiesNode;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        VirtualHost vhost = mock(VirtualHost.class);
+        MessageStore messageStore = new TestMemoryMessageStore();
+        when(vhost.getMessageStore()).thenReturn(messageStore);
+
+        _virtualHostPropertiesNode = new VirtualHostPropertiesNode(vhost);
+    }
+
+    public void testAddConsumer() throws Exception
+    {
+        final EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+        final ConsumerTarget target = mock(ConsumerTarget.class);
+        when(target.allocateCredit(any(ServerMessage.class))).thenReturn(true);
+
+        _virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, getTestName(),
options);
+        verify(target).send(any(ConsumerImpl.class), any(MessageInstance.class), anyBoolean());
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message