activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r757438 - in /activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker: ./ openwire/ stomp/ store/ store/memory/
Date Mon, 23 Mar 2009 16:58:43 GMT
Author: chirino
Date: Mon Mar 23 16:58:42 2009
New Revision: 757438

URL: http://svn.apache.org/viewvc?rev=757438&view=rev
Log:
Massaging the Store interface some more..

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=757438&r1=757437&r2=757438&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
Mon Mar 23 16:58:42 2009
@@ -60,4 +60,8 @@
      * @return
      */
     public Buffer getMessageBuffer();
+
+    public AsciiBuffer getEncoding();
+
+    public long getStreamId();
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=757438&r1=757437&r2=757438&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
Mon Mar 23 16:58:42 2009
@@ -24,6 +24,8 @@
 
 public class OpenWireMessageDelivery implements MessageDelivery {
 
+    static final private AsciiBuffer ENCODING = new AsciiBuffer("openwire");
+
     private final Message message;
     private Destination destination;
     private AsciiBuffer producerId;
@@ -115,4 +117,12 @@
         return message.isResponseRequired();
     }
 
+    public AsciiBuffer getEncoding() {
+        return ENCODING;
+    }
+
+    public long getStreamId() {
+        return 0;
+    }
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=757438&r1=757437&r2=757438&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
Mon Mar 23 16:58:42 2009
@@ -25,6 +25,8 @@
 
 public class StompMessageDelivery implements MessageDelivery {
 
+    static final private AsciiBuffer ENCODING = new AsciiBuffer("stomp");
+
     private final StompFrame frame;
     private Destination destination;
     private Runnable completionCallback;
@@ -138,4 +140,12 @@
             persistListener = null;
         }
     }
+
+    public AsciiBuffer getEncoding() {
+        return ENCODING;
+    }
+
+    public long getStreamId() {
+        return 0;
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=757438&r1=757437&r2=757438&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
Mon Mar 23 16:58:42 2009
@@ -25,15 +25,11 @@
 import org.apache.activemq.broker.DeliveryTarget;
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.store.Store.Callback;
-import org.apache.activemq.broker.store.Store.RecordKey;
 import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.broker.store.Store.Session.DuplicateKeyException;
 import org.apache.activemq.broker.store.Store.Session.QueueNotFoundException;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.queue.ExclusiveQueue;
 import org.apache.activemq.queue.IPollableFlowSource;
 import org.apache.activemq.queue.IPollableFlowSource.FlowReadyListener;
@@ -330,11 +326,9 @@
     private class AddMessageOperation extends OperationBase {
         private final MessageDelivery delivery;
         private final Collection<DeliveryTarget> targets;
-        private final Buffer messageBuffer;
         
         public AddMessageOperation(MessageDelivery delivery, Collection<DeliveryTarget>
targets) {
             this.delivery = delivery;
-            this.messageBuffer = delivery.getMessageBuffer();
             this.targets = targets;
             
         }
@@ -346,16 +340,22 @@
         @Override
         protected void doExcecute(Session session) {
             // TODO need to get at protocol buffer.
-            RecordKey key = session.messageAdd(delivery.getMsgId(), messageBuffer);
+            
+            Session.MessageRecord record = new Session.MessageRecord();
+            record.setMessageId(delivery.getMsgId());
+            record.setEncoding(delivery.getEncoding());
+            record.setBuffer(delivery.getMessageBuffer());
+            record.setStreamKey(delivery.getStreamId());
+            
+            Long key = session.messageAdd(record);
             for(DeliveryTarget target : targets)
             {
                 try {
-                    session.queueAddMessage(new AsciiBuffer(target.getPersistentQueueName()),
key, null);
+                    Session.QueueRecord queueRecord = new Session.QueueRecord();
+                    queueRecord.setAttachment(null);
+                    queueRecord.setMessageKey(key);
+                    session.queueAddMessage(target.getPersistentQueueName(), queueRecord);
                 } catch (QueueNotFoundException e) {
-                    // TODO Auto-generated catch block
-                    e.printStackTrace();
-                } catch (DuplicateKeyException e) {
-                    // TODO Auto-generated catch block
                     e.printStackTrace();
                 }
             }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=757438&r1=757437&r2=757438&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
Mon Mar 23 16:58:42 2009
@@ -80,11 +80,6 @@
      */
     public void flush();
 
-
-    interface RecordKey {
-        
-    }
-    
     /**
      * This interface allows you to query and update the Store.
      * 
@@ -112,30 +107,95 @@
         
         
         // Message related methods.
-        public RecordKey messageAdd(AsciiBuffer messageId, Buffer message);
-        public RecordKey messageGetKey(AsciiBuffer messageId);
-        public Buffer messageGet(RecordKey key);
-
-        // Message Chunking related methods.
-        public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer message);
-        public void messageChunkAdd(RecordKey key, Buffer message);
-        public void messageChunkClose(RecordKey key);
-        public Buffer messageChunkGet(RecordKey key, int offset, int max);
+        public static class MessageRecord {
+            Long key;
+            AsciiBuffer messageId;
+            AsciiBuffer encoding;
+            Buffer buffer;
+            Long streamKey;
+            
+            public Long getKey() {
+                return key;
+            }
+            public void setKey(Long key) {
+                this.key = key;
+            }
+            public AsciiBuffer getMessageId() {
+                return messageId;
+            }
+            public void setMessageId(AsciiBuffer messageId) {
+                this.messageId = messageId;
+            }
+            public AsciiBuffer getEncoding() {
+                return encoding;
+            }
+            public void setEncoding(AsciiBuffer encoding) {
+                this.encoding = encoding;
+            }
+            public Buffer getBuffer() {
+                return buffer;
+            }
+            public void setBuffer(Buffer buffer) {
+                this.buffer = buffer;
+            }
+            public Long getStreamKey() {
+                return streamKey;
+            }
+            public void setStreamKey(Long stream) {
+                this.streamKey = stream;
+            }
+        }
+
+        public Long messageAdd(MessageRecord message);
+        public Long messageGetKey(AsciiBuffer messageId);
+        public MessageRecord messageGetRecord(Long key);
+
+        public Long streamOpen();
+        public void streamWrite(Long key, Buffer message);
+        public void streamClose(Long key);
+        public Buffer streamRead(Long key, int offset, int max);
+        public boolean streamRemove(Long key);
 
-        // / Transaction related methods.
-        public Iterator<AsciiBuffer> transactionList(AsciiBuffer first);
+        // Transaction related methods.
+        public Iterator<AsciiBuffer> transactionList(AsciiBuffer first, int max);
         public void transactionAdd(AsciiBuffer txid);
-        public boolean transactionRemove(AsciiBuffer txid);
-        public void transactionAddMessage(AsciiBuffer txid, RecordKey messageKey);
-        public void transactionRemoveMessage(AsciiBuffer txid, AsciiBuffer queue, RecordKey
messageKey);
+        public void transactionAddMessage(AsciiBuffer txid, Long messageKey);
+        public void transactionRemoveMessage(AsciiBuffer txid, AsciiBuffer queueName, Long
messageKey);
+        public boolean transactionCommit(AsciiBuffer txid);
+        public boolean transactionRollback(AsciiBuffer txid);
         
-        // / Queue related methods.
-        public Iterator<AsciiBuffer> queueList(AsciiBuffer first);
-        public void queueAdd(AsciiBuffer queue);
-        public boolean queueRemove(AsciiBuffer queue);
-        public void queueAddMessage(AsciiBuffer queue, RecordKey key, Buffer attachment)
throws QueueNotFoundException, DuplicateKeyException;
-        public void queueRemoveMessage(AsciiBuffer queue, RecordKey key) throws QueueNotFoundException;
-        public Iterator<RecordKey> queueListMessagesQueue(AsciiBuffer queue, RecordKey
firstRecord, int max);
+        // Queue related methods.
+        public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max);
+        public void queueAdd(AsciiBuffer queueName);
+        public boolean queueRemove(AsciiBuffer queueName);
+        
+        public static class QueueRecord {
+            Long queueKey;
+            Long messageKey;
+            Buffer attachment;
+            
+            public Long getQueueKey() {
+                return queueKey;
+            }
+            public void setQueueKey(Long queueKey) {
+                this.queueKey = queueKey;
+            }
+            public Long getMessageKey() {
+                return messageKey;
+            }
+            public void setMessageKey(Long messageKey) {
+                this.messageKey = messageKey;
+            }
+            public Buffer getAttachment() {
+                return attachment;
+            }
+            public void setAttachment(Buffer attachment) {
+                this.attachment = attachment;
+            }
+        }
+        public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws QueueNotFoundException;
+        public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws QueueNotFoundException;
+        public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName,
Long firstQueueKey, int max);
 
         // We could use this to associate additional data to a message on a
         // queue like which consumer a message has been dispatched to.

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=757438&r1=757437&r2=757438&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
Mon Mar 23 16:58:42 2009
@@ -31,124 +31,105 @@
     MemorySession session = new MemorySession();
 
     private class MemorySession implements Session {
-        private HashMap<RecordKey, Buffer> messages = new HashMap<RecordKey, Buffer>();
-        private HashMap<AsciiBuffer, TreeMap<RecordKey, Buffer>> queues = new
HashMap<AsciiBuffer, TreeMap<RecordKey, Buffer>>();
-
-        // private HashMap<String, LinkedList<RecordKey>> queues = new
-        // HashMap<String, LinkedList<RecordKey>>();
-
-        public void beginTx() {
-
-        }
-
-        public void commitTx() {
-
+        
+        long messageSequence;
+        
+        private HashMap<Long, MessageRecord> messages = new HashMap<Long, MessageRecord>();
+        private HashMap<AsciiBuffer, Long> messagesKeys = new HashMap<AsciiBuffer,
Long>();
+        
+        private class StoredQueue {
+            long sequence;
+            TreeMap<Long, QueueRecord> records = new TreeMap<Long, QueueRecord>();
         }
+        
+        private TreeMap<AsciiBuffer, StoredQueue> queues = new TreeMap<AsciiBuffer,
StoredQueue>();
 
-        public void rollback() {
-            throw new UnsupportedOperationException();
-        }
 
         // //////////////////////////////////////////////////////////////////////////////
         // Message related methods.
         // ///////////////////////////////////////////////////////////////////////////////
-        public RecordKey messageAdd(AsciiBuffer messageId, Buffer message) {
-            RecordKey key = new MemoryRecordKey(messageId);
-            messages.put(key, message);
+        public Long messageAdd(MessageRecord record) {
+            Long key = ++messageSequence;
+            record.setKey(key);
+            messages.put(key, record);
+            messagesKeys.put(record.getMessageId(), key);
             return key;
         }
+ 
+        public Long messageGetKey(AsciiBuffer messageId) {
+            return messagesKeys.get(messageId);
+        }
 
-        public Buffer messageGet(RecordKey key) {
+        public MessageRecord messageGetRecord(Long key) {
             return messages.get(key);
         }
- 
-        public RecordKey messageGetKey(AsciiBuffer messageId) {
-            MemoryRecordKey key = new MemoryRecordKey(messageId);
-            return messages.containsKey(key) ? key : null;
-        }
 
         // //////////////////////////////////////////////////////////////////////////////
         // Queue related methods.
         // ///////////////////////////////////////////////////////////////////////////////
-        public void queueAdd(AsciiBuffer queue) {
-            TreeMap<RecordKey, Buffer> messages = queues.get(queue);
-            if (messages == null) {
-                messages = new TreeMap<RecordKey, Buffer>();
-                queues.put(queue, messages);
+        public void queueAdd(AsciiBuffer queueName) {
+            StoredQueue queue = queues.get(queueName);
+            if (queue == null) {
+                queue = new StoredQueue();
+                queues.put(queueName, queue);
             }
         }
 
-        public void queueAddMessage(AsciiBuffer queue, RecordKey key, Buffer attachment)
throws QueueNotFoundException, DuplicateKeyException {
-            TreeMap<RecordKey, Buffer> messages = queues.get(queue);
-            if (messages != null) {
-                if (messages.put(key, attachment) != null) {
-                    throw new DuplicateKeyException("");
-                }
+        public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws QueueNotFoundException
{
+            StoredQueue queue = queues.get(queueName);
+            if (queue != null) {
+                Long key = ++queue.sequence;
+                record.setQueueKey(key);
+                queue.records.put(key, record);
+                return key;
             } else {
-                throw new QueueNotFoundException(queue.toString());
+                throw new QueueNotFoundException(queueName.toString());
             }
         }
 
-        public void queueRemoveMessage(AsciiBuffer queue, RecordKey key) throws QueueNotFoundException
{
-            TreeMap<RecordKey, Buffer> messages = queues.get(queue);
-            if (messages != null) {
-                messages.remove(key);
-            } else {
-                throw new QueueNotFoundException(queue.toString());
+        public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws QueueNotFoundException
{
+            StoredQueue queue = queues.get(queueName);
+            if (queue == null) {
+                throw new QueueNotFoundException(queueName.toString());
             }
+            queue.records.remove(queueKey);
         }
 
-        public Iterator<AsciiBuffer> queueList(AsciiBuffer first) {
+        public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max)
{
             ArrayList<AsciiBuffer> list = new ArrayList<AsciiBuffer>(queues.size());
-            for (AsciiBuffer queue : queues.keySet()) {
+            for (AsciiBuffer queue : queues.tailMap(firstQueueName).keySet()) {
+                if( list.size() >= max ) {
+                    break;
+                }
                 list.add(queue);
             }
             return list.iterator();
         }
 
-        public Iterator<RecordKey> queueListMessagesQueue(AsciiBuffer queue, RecordKey
firstRecord, int max) {
-            ArrayList<RecordKey> list = new ArrayList<RecordKey>(max);
-            TreeMap<RecordKey, Buffer> messages = queues.get(queue);
-            if (messages != null) {
-                for (RecordKey key : messages.tailMap(firstRecord).keySet() ) {
-                    list.add(key);
-                    if (list.size() == max) {
+        public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName,
Long firstQueueKey, int max) {
+            ArrayList<QueueRecord> list = new ArrayList<QueueRecord>(max);
+            StoredQueue queue = queues.get(queueName);
+            if (queue != null) {
+                for (Long key : queue.records.tailMap(firstQueueKey).keySet() ) {
+                    if (list.size() >= max) {
                         break;
                     }
+                    list.add(queue.records.get(key));
                 }
             }
             return list.iterator();
         }
 
-        public boolean queueRemove(AsciiBuffer queue) {
-            TreeMap<RecordKey, Buffer> messages = queues.get(queue);
-            if (messages != null) {
-                Iterator<RecordKey> msgKeys = messages.keySet().iterator();
-                while (msgKeys.hasNext()) {
-                    RecordKey msgKey = msgKeys.next();
-                    try {
-                        queueRemoveMessage(queue, msgKey);
-                    } catch (QueueNotFoundException e) {
-                        // Can't happen.
-                    }
-                }
-                queues.remove(queue);
-
+        public boolean queueRemove(AsciiBuffer queueName) {
+            StoredQueue queue = queues.get(queueName);
+            if (queue != null) {
+                queue.records.clear();
+                queues.remove(queueName);
                 return true;
             }
             return false;
         }
 
-        /*
-         * public void queueUpdateMessageAttachment(AsciiBuffer queue, RecordKey
-         * key, Buffer attachment) { // TODO Auto-generated method stub }
-         * 
-         * public Buffer queueGetMessageAttachment(AsciiBuffer queue, RecordKey
-         * key) throws QueueNotFoundException { TreeMap<RecordKey, Buffer>
-         * messages = queues.get(queue); if (messages != null) {
-         * messages.add(key); } else { throw new
-         * QueueNotFoundException(queue.toString()); } }
-         */
 
         // //////////////////////////////////////////////////////////////////////////////
         // Simple Key Value related methods could come in handy to store misc
@@ -175,67 +156,47 @@
         }
 
         // ///////////////////////////////////////////////////////////////////////////////
-        // Message Chunking related methods
+        // Stream related methods
         // ///////////////////////////////////////////////////////////////////////////////
-        public void messageChunkAdd(RecordKey key, Buffer message) {
+        public Long streamOpen() {
             throw new UnsupportedOperationException();
         }
-
-        public void messageChunkClose(RecordKey key) {
+        public void streamWrite(Long key, Buffer message) {
             throw new UnsupportedOperationException();
         }
-
-        public Buffer messageChunkGet(RecordKey key, int offset, int max) {
+        public void streamClose(Long key) {
             throw new UnsupportedOperationException();
         }
-
-        public RecordKey messageChunkOpen(AsciiBuffer messageId, Buffer message) {
+        public Buffer streamRead(Long key, int offset, int max) {
+            throw new UnsupportedOperationException();
+        }
+        public boolean streamRemove(Long key) {
             throw new UnsupportedOperationException();
         }
 
+        // ///////////////////////////////////////////////////////////////////////////////
+        // Transaction related methods
+        // ///////////////////////////////////////////////////////////////////////////////
+        public Iterator<AsciiBuffer> transactionList(AsciiBuffer first, int max) {
+            throw new UnsupportedOperationException();
+        }
         public void transactionAdd(AsciiBuffer txid) {
             throw new UnsupportedOperationException();
         }
-
-        public void transactionAddMessage(AsciiBuffer txid, RecordKey messageKey) {
+        public void transactionAddMessage(AsciiBuffer txid, Long messageKey) {
             throw new UnsupportedOperationException();
         }
-
-        public Iterator<AsciiBuffer> transactionList(AsciiBuffer first) {
+        public void transactionRemoveMessage(AsciiBuffer txid, AsciiBuffer queue, Long messageKey)
{
             throw new UnsupportedOperationException();
         }
-
-        public boolean transactionRemove(AsciiBuffer txid) {
+        public boolean transactionCommit(AsciiBuffer txid) {
             throw new UnsupportedOperationException();
         }
-
-        public void transactionRemoveMessage(AsciiBuffer txid, AsciiBuffer queue, RecordKey
messageKey) {
+        public boolean transactionRollback(AsciiBuffer txid) {
             throw new UnsupportedOperationException();
         }
 
-    }
 
-    final private class MemoryRecordKey implements RecordKey {
-        final AsciiBuffer messageId;
-
-        MemoryRecordKey(AsciiBuffer messageId) {
-            this.messageId = messageId;
-        }
-        
-        @Override
-        public int hashCode() {
-            return messageId.hashCode();
-        }
-        
-        @Override
-        public boolean equals(Object obj) {
-            if( obj == null || obj.getClass()!=MemoryRecordKey.class )
-                return false;
-            if( this == obj )
-                return true;
-            MemoryRecordKey key = (MemoryRecordKey)obj;
-            return messageId.equals(key.messageId);
-        }
     }
 
     public <R, T extends Exception> R execute(Callback<R, T> callback, Runnable
runnable) throws T {



Mime
View raw message