qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ritch...@apache.org
Subject svn commit: r747754 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/configuration/ main/java/org/apache/qpid/server/queue/ main/java/org/apache/qpid/server/virtualhost/ test/java/org/apache/qpid/server/queue/
Date Wed, 25 Feb 2009 11:32:24 GMT
Author: ritchiem
Date: Wed Feb 25 11:32:24 2009
New Revision: 747754

URL: http://svn.apache.org/viewvc?rev=747754&view=rev
Log:
QPID-1634 : Created QueueBackingStore interface and implementation FileQueueBackingStore.
Tested by FileQueueBackingStoreTest.

Added:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=747754&r1=747753&r2=747754&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
Wed Feb 25 11:32:24 2009
@@ -28,6 +28,7 @@
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MemoryMessageStore;
 
@@ -71,7 +72,7 @@
 
     public VirtualHostConfiguration(String name, Configuration mungedConf) throws ConfigurationException
     {
-        this(name,mungedConf, null);
+        this(name,mungedConf,  new ServerConfiguration(new PropertiesConfiguration()));
     }
 
     public String getName()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=747754&r1=747753&r2=747754&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Wed Feb 25 11:32:24 2009
@@ -113,6 +113,9 @@
     void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean
isLastContentBody)
             throws AMQException;
 
+    void recoverFromMessageMetaData(MessageMetaData mmd);
+
+    void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws
AMQException;
 
 
     String toString();

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java?rev=747754&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
(added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
Wed Feb 25 11:32:24 2009
@@ -0,0 +1,419 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.util.FileUtils;
+import org.apache.qpid.AMQException;
+import org.apache.commons.configuration.ConfigurationException;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class FileQueueBackingStore implements QueueBackingStore
+{
+    private static final Logger _log = Logger.getLogger(FileQueueBackingStore.class);
+
+    private AtomicBoolean _closed = new AtomicBoolean(false);
+    private String _flowToDiskLocation;
+    private static final String QUEUE_BACKING_DIR = "queueBacking";
+
+    public void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws
ConfigurationException
+    {
+        setFlowToDisk(virtualHost.getName(), config.getFlowToDiskLocation());
+    }
+
+    private void setFlowToDisk(String vHostName, String location) throws ConfigurationException
+    {
+        if (vHostName == null)
+        {
+            throw new ConfigurationException("Unable to setup to Flow to Disk as Virtualhost
name was not specified");
+        }
+
+        if (location == null)
+        {
+            throw new ConfigurationException("Unable to setup to Flow to Disk as location
was not specified.");
+        }
+
+        _flowToDiskLocation = location;
+
+        _flowToDiskLocation += File.separator + QUEUE_BACKING_DIR + File.separator + vHostName;
+
+        File root = new File(location);
+        if (!root.exists())
+        {
+            throw new ConfigurationException("Specified Flow to Disk root does not exist:"
+ root.getAbsolutePath());
+        }
+        else
+        {
+
+            if (root.isFile())
+            {
+                throw new ConfigurationException("Unable to create Temporary Flow to Disk
store as specified root is a file:"+
+                           root.getAbsolutePath());
+            }
+
+            if(!root.canWrite())
+            {
+                throw new ConfigurationException("Unable to create Temporary Flow to Disk
store. Unable to write to specified root:"+
+                           root.getAbsolutePath());
+            }
+
+        }
+
+
+        File store = new File(_flowToDiskLocation);
+        if (store.exists())
+        {
+            if (!FileUtils.delete(store, true))
+            {
+                throw new ConfigurationException("Unable to create Temporary Flow to Disk
store as directory already exsits:"
+                           + store.getAbsolutePath());
+            }
+
+            if (store.isFile())
+            {
+                throw new ConfigurationException("Unable to create Temporary Flow to Disk
store as specified location is a file:"+
+                           store.getAbsolutePath());
+            }
+
+        }
+        else
+        {
+            if (!store.getParentFile().getParentFile().canWrite())
+            {
+                throw new ConfigurationException("Unable to create Temporary Flow to Disk
store. Unable to write to parent location:"+
+                           store.getParentFile().getParentFile().getAbsolutePath());
+            }
+        }
+
+
+        _log.info("Creating Flow to Disk Store : " + store.getAbsolutePath());
+        store.deleteOnExit();
+        if (!store.mkdirs())
+        {
+            throw new ConfigurationException("Unable to create Temporary Flow to Disk store:"
+ store.getAbsolutePath());
+        }
+    }
+
+
+    public AMQMessage recover(Long messageId)
+     {
+         MessageMetaData mmd;
+         List<ContentChunk> contentBodies = new LinkedList<ContentChunk>();
+
+         File handle = getFileHandle(messageId);
+         handle.deleteOnExit();
+
+         ObjectInputStream input = null;
+
+         Exception error = null;
+         try
+         {
+             input = new ObjectInputStream(new FileInputStream(handle));
+
+             long arrivaltime = input.readLong();
+
+             final AMQShortString exchange = new AMQShortString(input.readUTF());
+             final AMQShortString routingKey = new AMQShortString(input.readUTF());
+             final boolean mandatory = input.readBoolean();
+             final boolean immediate = input.readBoolean();
+
+             int bodySize = input.readInt();
+             byte[] underlying = new byte[bodySize];
+
+             input.readFully(underlying, 0, bodySize);
+
+             ByteBuffer buf = ByteBuffer.wrap(underlying);
+
+             ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, bodySize);
+
+             int chunkCount = input.readInt();
+
+             // There are WAY to many annonymous MPIs in the code this should be made concrete.
+             MessagePublishInfo info = new MessagePublishInfo()
+             {
+
+                 public AMQShortString getExchange()
+                 {
+                     return exchange;
+                 }
+
+                 public void setExchange(AMQShortString exchange)
+                 {
+
+                 }
+
+                 public boolean isImmediate()
+                 {
+                     return immediate;
+                 }
+
+                 public boolean isMandatory()
+                 {
+                     return mandatory;
+                 }
+
+                 public AMQShortString getRoutingKey()
+                 {
+                     return routingKey;
+                 }
+             };
+
+             mmd = new MessageMetaData(info, chb, chunkCount);
+             mmd.setArrivalTime(arrivaltime);
+
+             AMQMessage message;
+             if (((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2)
+             {
+                 message = new PersistentAMQMessage(messageId, null);
+             }
+             else
+             {
+                 message = new TransientAMQMessage(messageId);
+             }
+
+             message.recoverFromMessageMetaData(mmd);
+
+             for (int chunk = 0; chunk < chunkCount; chunk++)
+             {
+                 int length = input.readInt();
+
+                 byte[] data = new byte[length];
+
+                 input.readFully(data, 0, length);
+
+                 // There are WAY to many annonymous CCs in the code this should be made
concrete.
+                 try
+                 {
+                     message.recoverContentBodyFrame(new RecoverDataBuffer(length, data),
(chunk + 1 == chunkCount));
+                 }
+                 catch (AMQException e)
+                 {
+                     //ignore as this will not occur.
+                     // It is thrown by the _transactionLog method in recover on PersistentAMQMessage
+                     // but we have created the message with a null log and will never call
that method.
+                 }
+             }
+
+             return message;
+         }
+         catch (Exception e)
+         {
+             error = e;
+         }
+         finally
+         {
+             try
+             {
+                 input.close();
+             }
+             catch (IOException e)
+             {
+                 _log.info("Unable to close input on message("+messageId+") recovery due
to:"+e.getMessage());
+             }
+         }
+
+        throw new UnableToRecoverMessageException(error);
+    }
+
+
+    public void flow(AMQMessage message) throws UnableToFlowMessageException
+    {
+        long messageId = message.getMessageId();
+
+        File handle = getFileHandle(messageId);
+
+        //If we have written the data once then we don't need to do it again.
+        if (handle.exists())
+        {
+            _log.debug("Message(" + messageId + ") already flowed to disk.");
+            return;
+        }
+
+        handle.deleteOnExit();
+
+        ObjectOutputStream writer = null;
+        Exception error = null;
+
+        try
+        {
+            writer = new ObjectOutputStream(new FileOutputStream(handle));
+
+            writer.writeLong(message.getArrivalTime());
+
+            MessagePublishInfo mpi = message.getMessagePublishInfo();
+            writer.writeUTF(String.valueOf(mpi.getExchange()));
+            writer.writeUTF(String.valueOf(mpi.getRoutingKey()));
+            writer.writeBoolean(mpi.isMandatory());
+            writer.writeBoolean(mpi.isImmediate());
+            ContentHeaderBody chb = message.getContentHeaderBody();
+
+            // write out the content header body
+            final int bodySize = chb.getSize();
+            byte[] underlying = new byte[bodySize];
+            ByteBuffer buf = ByteBuffer.wrap(underlying);
+            chb.writePayload(buf);
+
+            writer.writeInt(bodySize);
+            writer.write(underlying, 0, bodySize);
+
+            int bodyCount = message.getBodyCount();
+            writer.writeInt(bodyCount);
+
+            //WriteContentBody
+            for (int index = 0; index < bodyCount; index++)
+            {
+                ContentChunk chunk = message.getContentChunk(index);
+                chunk.reduceToFit();
+
+                byte[] chunkData = chunk.getData().array();
+
+                int length = chunk.getSize();
+                writer.writeInt(length);
+                writer.write(chunkData, 0, length);
+            }
+        }
+        catch (FileNotFoundException e)
+        {
+            error = e;
+        }
+        catch (IOException e)
+        {
+            error = e;
+        }
+        finally
+        {
+            try
+            {
+                writer.flush();
+                writer.close();
+            }
+            catch (IOException e)
+            {
+                error = e;
+            }
+        }
+
+        if (error != null)
+        {
+            _log.error("Unable to flow message(" + messageId + ") to disk, restoring state.");
+            handle.delete();
+            throw new UnableToFlowMessageException(messageId, error);
+        }
+    }
+
+    /**
+     * Use the messageId to calculate the file path on disk.
+     *
+     * Current implementation will give us 256 bins.
+     * Therefore the maximum messages that can be flowed before error/platform is:
+     * ext3 : 256 bins * 32000  = 8192000
+     * FAT32 : 256 bins * 65534 = 16776704
+     * Other FS have much greater limits than we need to worry about.
+     *
+     * @param messageId the Message we need a file Handle for.
+     *
+     * @return the File handle
+     */
+    private File getFileHandle(long messageId)
+    {
+        // grab the 8 LSB to give us 256 bins
+        long bin = messageId & 0xFFL;
+
+        String bin_path =_flowToDiskLocation + File.separator + bin;
+        File bin_dir = new File(bin_path);
+
+        if (!bin_dir.exists())
+        {
+            bin_dir.mkdirs();
+            bin_dir.deleteOnExit();
+        }
+
+        String id = bin_path + File.separator + messageId;
+
+        return new File(id);
+    }
+
+    public void delete(Long messageId)
+    {
+        String id = String.valueOf(messageId);
+        File handle = new File(_flowToDiskLocation, id);
+
+        if (handle.exists())
+        {
+            _log.debug("Message(" + messageId + ") delete flowToDisk.");
+            if (!handle.delete())
+            {
+                throw new RuntimeException("Unable to delete flowToDisk data");
+            }
+        }
+    }
+
+    private class RecoverDataBuffer implements ContentChunk
+    {
+        private int _length;
+        private ByteBuffer _dataBuffer;
+
+        public RecoverDataBuffer(int length, byte[] data)
+        {
+            _length = length;
+            _dataBuffer = ByteBuffer.wrap(data);
+        }
+
+        public int getSize()
+        {
+            return _length;
+        }
+
+        public ByteBuffer getData()
+        {
+            return _dataBuffer;
+        }
+
+        public void reduceToFit()
+        {
+
+        }
+
+    }
+
+}
+

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java?rev=747754&r1=747753&r2=747754&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
Wed Feb 25 11:32:24 2009
@@ -52,7 +52,8 @@
             throws AMQException
     {
         super.setPublishAndContentHeaderBody(storeContext, messagePublishInfo, contentHeaderBody);
-        MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody,
_contentBodies == null ? 0 : _contentBodies.size(), _arrivalTime);
+        MessageMetaData mmd = new MessageMetaData(messagePublishInfo, contentHeaderBody,
+                                                  _contentBodies == null ? 0 : _contentBodies.size(),
_arrivalTime);
 
         _transactionLog.storeMessageMetaData(storeContext, _messageId, mmd);
     }
@@ -63,13 +64,7 @@
         return true;
     }
 
-    public void recoverFromMessageMetaData(MessageMetaData mmd)
-    {
-        _arrivalTime = mmd.getArrivalTime();
-        _contentHeaderBody = mmd.getContentHeaderBody();
-        _messagePublishInfo = mmd.getMessagePublishInfo();
-    }
-
+    @Override
     public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody)
throws AMQException
     {
         super.addContentBodyFrame(null, contentChunk, isLastContentBody);

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java?rev=747754&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
(added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
Wed Feb 25 11:32:24 2009
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.commons.configuration.ConfigurationException;
+
+public interface QueueBackingStore
+{
+    void configure(VirtualHost virtualHost, VirtualHostConfiguration config) throws ConfigurationException;
+
+    AMQMessage recover(Long messageId);
+
+    void flow(AMQMessage message) throws UnableToFlowMessageException;
+
+    void delete(Long messageId);
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java?rev=747754&r1=747753&r2=747754&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
Wed Feb 25 11:32:24 2009
@@ -339,27 +339,48 @@
             throw new NullPointerException("PublishInfo cannot be null");
         }
 
-        _messagePublishInfo = messagePublishInfo;
+        _arrivalTime = System.currentTimeMillis();
+
+
         _contentHeaderBody = contentHeaderBody;
+        _messagePublishInfo = messagePublishInfo;
+
+        updateHeaderAndFlags();
+    }
+
+    public long getArrivalTime()
+    {
+        return _arrivalTime;
+    }
+
+    public void recoverFromMessageMetaData(MessageMetaData mmd)
+    {
+        _arrivalTime = mmd.getArrivalTime();
+        _contentHeaderBody = mmd.getContentHeaderBody();
+        _messagePublishInfo = mmd.getMessagePublishInfo();
 
-        if (contentHeaderBody.bodySize == 0)
+        updateHeaderAndFlags();
+    }
+
+    private void updateHeaderAndFlags()
+    {
+        if (_contentHeaderBody.bodySize == 0)
         {
             _contentBodies = Collections.EMPTY_LIST;
         }
 
-        _arrivalTime = System.currentTimeMillis();
-
-        if (messagePublishInfo.isImmediate())
+        if (_messagePublishInfo.isImmediate())
         {
             _flags |= IMMEDIATE;
         }
     }
 
-    public long getArrivalTime()
+    public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody)
throws AMQException
     {
-        return _arrivalTime;
+        addContentBodyFrame(null, contentChunk, isLastContentBody);
     }
 
+
     public String toString()
     {
         // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount
+ "; taken : " +

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java?rev=747754&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java
(added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToFlowMessageException.java
Wed Feb 25 11:32:24 2009
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public class UnableToFlowMessageException extends Exception
+{
+    public UnableToFlowMessageException(long messageId, Exception error)
+    {
+        super("Unable to Flow Message:"+messageId, error);
+    }
+}

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java?rev=747754&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java
(added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnableToRecoverMessageException.java
Wed Feb 25 11:32:24 2009
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public class UnableToRecoverMessageException extends RuntimeException
+{
+    public UnableToRecoverMessageException(Exception error)
+    {
+        super(error);
+    }
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=747754&r1=747753&r2=747754&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
Wed Feb 25 11:32:24 2009
@@ -84,7 +84,9 @@
     private ACLManager _accessManager;
 
     private final Timer _houseKeepingTimer;
-     
+    
+    private VirtualHostConfiguration _configuration;
+
     public void setAccessableName(String name)
     {
         _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
@@ -106,6 +108,11 @@
         return _routingTable;
     }
 
+    public VirtualHostConfiguration getConfiguration()
+    {
+        return _configuration ;
+    }
+
     /**
      * Abstract MBean class. This has some of the methods implemented from management intrerface
for exchanges. Any
      * implementaion of an Exchange MBean should extend this class.
@@ -137,7 +144,6 @@
 
     /**
      * Normal Constructor
-     * @param name
      * @param hostConfig
      * @throws Exception
      */
@@ -148,6 +154,7 @@
 
     public VirtualHost(VirtualHostConfiguration hostConfig, TransactionLog transactionLog)
throws Exception
     {
+        _configuration = hostConfig;
         _name = hostConfig.getName();
         
         if (_name == null || _name.length() == 0)

Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java?rev=747754&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
(added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/FileQueueBackingStoreTest.java
Wed Feb 25 11:32:24 2009
@@ -0,0 +1,217 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.io.File;
+
+public class FileQueueBackingStoreTest extends TestCase
+{
+    FileQueueBackingStore _backing;
+    private TransactionLog _transactionLog;
+    VirtualHost _vhost;
+        VirtualHostConfiguration _vhostConfig;
+
+    public void setUp() throws Exception
+    {
+        _backing = new FileQueueBackingStore();
+        PropertiesConfiguration config = new PropertiesConfiguration();
+        config.addProperty("store.class", MemoryMessageStore.class.getName());
+        _vhostConfig = new VirtualHostConfiguration(this.getName() + "-Vhost", config);
+        _vhost = new VirtualHost(_vhostConfig);
+        _transactionLog = _vhost.getTransactionLog();
+
+        _backing.configure(_vhost, _vhost.getConfiguration());
+
+    }
+
+    private void resetBacking(Configuration configuration) throws Exception
+    {
+        configuration.addProperty("store.class", MemoryMessageStore.class.getName());
+        _vhostConfig = new VirtualHostConfiguration(this.getName() + "-Vhost", configuration);
+        _vhost = new VirtualHost(_vhostConfig);
+        _transactionLog = _vhost.getTransactionLog();
+
+        _backing = new FileQueueBackingStore();
+
+        _backing.configure(_vhost, _vhost.getConfiguration());
+    }
+
+    public void testInvalidSetupRootExistsIsFile() throws Exception
+    {
+
+        File fileAsRoot = File.createTempFile("tmpRoot", "");
+        fileAsRoot.deleteOnExit();
+
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+        configuration.addProperty(VirtualHostConfiguration.FLOW_TO_DISK_PATH, fileAsRoot.getAbsolutePath());
+
+        try
+        {
+            resetBacking(configuration);
+            fail("Exception expected to be thrown");
+        }
+        catch (ConfigurationException ce)
+        {
+            assertTrue("Expected Exception not thrown, expecting:" +
+                       "Unable to create Temporary Flow to Disk store as specified root is
a file:",
+                       ce.getMessage().
+                               startsWith("Unable to create Temporary Flow to Disk store
as specified root is a file:"));
+        }
+
+    }
+
+    public void testInvalidSetupRootExistsCantWrite() throws Exception
+    {
+
+        File fileAsRoot = new File("/var/log");
+
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+
+        configuration.addProperty(VirtualHostConfiguration.FLOW_TO_DISK_PATH, fileAsRoot.getAbsolutePath());
+
+        try
+        {
+            resetBacking(configuration);
+            fail("Exception expected to be thrown");
+        }
+        catch (ConfigurationException ce)
+        {
+            assertEquals("Unable to create Temporary Flow to Disk store. Unable to write
to specified root:/var/log",
+                         ce.getMessage());
+        }
+
+    }
+
+    public void testEmptyTransientFlowToDisk() throws UnableToFlowMessageException, AMQException
+    {
+        AMQMessage original = MessageFactory.getInstance().createMessage(null, false);
+
+        ContentHeaderBody chb = new ContentHeaderBody(new BasicContentHeaderProperties(),
BasicPublishBodyImpl.CLASS_ID);
+        chb.bodySize = 0L;
+
+        runTestWithMessage(original, chb);
+    }
+
+    public void testEmptyPersistentFlowToDisk() throws UnableToFlowMessageException, AMQException
+    {
+
+        AMQMessage original = MessageFactory.getInstance().createMessage(_transactionLog,
true);
+        ContentHeaderBody chb = new ContentHeaderBody(new BasicContentHeaderProperties(),
BasicPublishBodyImpl.CLASS_ID);
+        chb.bodySize = 0L;
+        ((BasicContentHeaderProperties) chb.properties).setDeliveryMode((byte) 2);
+
+        runTestWithMessage(original, chb);
+
+    }
+
+    public void testNonEmptyTransientFlowToDisk() throws UnableToFlowMessageException, AMQException
+    {
+        AMQMessage original = MessageFactory.getInstance().createMessage(null, false);
+
+        ContentHeaderBody chb = new ContentHeaderBody(new BasicContentHeaderProperties(),
BasicPublishBodyImpl.CLASS_ID);
+        chb.bodySize = 100L;
+
+        runTestWithMessage(original, chb);
+    }
+
+    public void testNonEmptyPersistentFlowToDisk() throws UnableToFlowMessageException, AMQException
+    {
+        AMQMessage original = MessageFactory.getInstance().createMessage(_transactionLog,
true);
+        ContentHeaderBody chb = new ContentHeaderBody(new BasicContentHeaderProperties(),
BasicPublishBodyImpl.CLASS_ID);
+        chb.bodySize = 100L;
+        ((BasicContentHeaderProperties) chb.properties).setDeliveryMode((byte) 2);
+
+        runTestWithMessage(original, chb);
+    }
+
+    void runTestWithMessage(AMQMessage original, ContentHeaderBody chb) throws UnableToFlowMessageException,
AMQException
+    {
+
+        // Create message
+
+        original.setPublishAndContentHeaderBody(null,
+                                                new MessagePublishInfoImpl(ExchangeDefaults.DIRECT_EXCHANGE_NAME,
+                                                                           false, false,
new AMQShortString("routing")),
+                                                chb);
+        if (chb.bodySize > 0)
+        {
+            ContentChunk chunk = new MockContentChunk((int) chb.bodySize/2);
+
+            original.addContentBodyFrame(null, chunk, false);
+
+            chunk = new MockContentChunk((int) chb.bodySize/2);
+
+            original.addContentBodyFrame(null, chunk, true);            
+        }
+
+        _backing.flow(original);
+
+        AMQMessage fromDisk = _backing.recover(original.getMessageId());
+
+        assertEquals("Message IDs do not match", original.getMessageId(), fromDisk.getMessageId());
+        assertEquals("Message arrival times do not match", original.getArrivalTime(), fromDisk.getArrivalTime());
+        assertEquals(original.isPersistent(), fromDisk.isPersistent());
+
+        // Validate the MPI data was restored correctly
+        MessagePublishInfo originalMPI = original.getMessagePublishInfo();
+        MessagePublishInfo fromDiskMPI = fromDisk.getMessagePublishInfo();
+        assertEquals("Exchange", originalMPI.getExchange(), fromDiskMPI.getExchange());
+        assertEquals(originalMPI.isImmediate(), fromDiskMPI.isImmediate());
+        assertEquals(originalMPI.isMandatory(), fromDiskMPI.isMandatory());
+        assertEquals(originalMPI.getRoutingKey(), fromDiskMPI.getRoutingKey());
+
+        // Validate BodyCounts.
+        int originalBodyCount = original.getBodyCount();
+        assertEquals(originalBodyCount, fromDisk.getBodyCount());
+
+        if (originalBodyCount > 0)
+        {
+            for (int index = 0; index < originalBodyCount; index++)
+            {
+                ContentChunk originalChunk = original.getContentChunk(index);
+                ContentChunk fromDiskChunk = fromDisk.getContentChunk(index);
+
+                assertEquals(originalChunk.getSize(), fromDiskChunk.getSize());
+                assertEquals(originalChunk.getData(), fromDiskChunk.getData());
+            }
+        }
+
+    }
+
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message