activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r757475 - in /activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker: ./ openwire/ stomp/ store/ store/memory/
Date Mon, 23 Mar 2009 18:05:40 GMT
Author: chirino
Date: Mon Mar 23 18:05:40 2009
New Revision: 757475

URL: http://svn.apache.org/viewvc?rev=757475&view=rev
Log:
working on the Store interface

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=757475&r1=757474&r2=757475&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
Mon Mar 23 18:05:40 2009
@@ -18,6 +18,7 @@
 
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
 
 public interface MessageDelivery {
 
@@ -47,4 +48,6 @@
     public void onMessagePersisted();
     
     public Store.Session.MessageRecord createMessageRecord();
+
+    public Buffer getTransactionId();
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=757475&r1=757474&r2=757475&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Mon
Mar 23 18:05:40 2009
@@ -26,8 +26,12 @@
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.QueueDomain;
 import org.apache.activemq.broker.TopicDomain;
+import org.apache.activemq.broker.store.Store.Callback;
+import org.apache.activemq.broker.store.Store.Session;
+import org.apache.activemq.broker.store.Store.VoidCallback;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
 
 final public class Router {
 
@@ -35,6 +39,7 @@
     public static final AsciiBuffer QUEUE_DOMAIN = new AsciiBuffer("queue");
 
     private final HashMap<AsciiBuffer, Domain> domains = new HashMap<AsciiBuffer,
Domain>();
+    private VirtualHost virtualHost;
 
     public Router() {
         domains.put(QUEUE_DOMAIN, new QueueDomain());
@@ -60,6 +65,28 @@
 
     public void route(final MessageDelivery msg, ISourceController<?> controller) {
 
+//        final Buffer transactionId = msg.getTransactionId();
+//        if( msg.isPersistent() ) {
+//            VoidCallback<RuntimeException> tx = new VoidCallback<RuntimeException>()
{
+//                @Override
+//                public void run(Session session) throws RuntimeException {
+//                    Long messageKey = session.messageAdd(msg.createMessageRecord());
+//                    if( transactionId!=null ) {
+//                        session.transactionAddMessage(transactionId, messageKey);
+//                    }
+//                }
+//            };
+//            Runnable onFlush = new Runnable() {
+//                public void run() {
+//                    if( msg.isResponseRequired() ) {
+//                        // Let the client know the broker got the message.
+//                        msg.onMessagePersisted();
+//                    }
+//                }
+//            };
+//            virtualHost.getStore().execute(tx, onFlush);
+//        }
+//        
         Collection<DeliveryTarget> targets = route(msg.getDestination(), msg);
 
         // TODO:
@@ -70,8 +97,7 @@
             if (msg.isResponseRequired()) {
                 // We need to ack the message once we ensure we won't loose it.
                 // We know we won't loose it once it's persisted or delivered to
-                // a consumer
-                // Setup a callback to get notifed once one of those happens.
+                // a consumer Setup a callback to get notifed once one of those happens.
                 if (!msg.isPersistent()) {
                     // Let the client know the broker got the message.
                     msg.onMessagePersisted();
@@ -112,4 +138,12 @@
         }
     }
 
+    public void setVirtualHost(VirtualHost virtualHost) {
+        this.virtualHost = virtualHost;
+    }
+
+    public VirtualHost getVirtualHost() {
+        return virtualHost;
+    }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=757475&r1=757474&r2=757475&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
Mon Mar 23 18:05:40 2009
@@ -20,6 +20,8 @@
 import java.util.HashMap;
 
 import org.apache.activemq.Service;
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.memory.MemoryStore;
 import org.apache.activemq.protobuf.AsciiBuffer;
 
 /**
@@ -27,10 +29,15 @@
  */
 public class VirtualHost implements Service {
     
+    final private HashMap<Destination, Queue> queues = new HashMap<Destination,
Queue>();
     private ArrayList<AsciiBuffer> hostNames = new ArrayList<AsciiBuffer>();
-    private Router router = new Router();
-    private HashMap<Destination, Queue> queues = new HashMap<Destination, Queue>();
-
+    private Router router;
+    private Store store = new MemoryStore();
+    
+    public VirtualHost() {
+        setRouter(new Router());
+    }
+    
     public AsciiBuffer getHostName() {
         if( hostNames.size() > 0 ) {
             hostNames.get(0);
@@ -48,7 +55,11 @@
     public Router getRouter() {
         return router;
     }
-    
+    public void setRouter(Router router) {
+        this.router.setVirtualHost(this);
+        this.router = router;
+    }
+
     public void start() throws Exception {
         for (Queue queue : queues.values()) {
             queue.start();
@@ -65,5 +76,13 @@
         domain.add(queue.getDestination().getName(), queue);
     }
 
+    public Store getStore() {
+        return store;
+    }
+
+    public void setStore(Store store) {
+        this.store = store;
+    }
+
 
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=757475&r1=757474&r2=757475&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
Mon Mar 23 18:05:40 2009
@@ -21,6 +21,7 @@
 import org.apache.activemq.broker.store.Store.Session.MessageRecord;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
 
 public class OpenWireMessageDelivery implements MessageDelivery {
 
@@ -110,4 +111,9 @@
         return record;
     }
 
+    public Buffer getTransactionId() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=757475&r1=757474&r2=757475&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
Mon Mar 23 18:05:40 2009
@@ -20,6 +20,7 @@
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.store.Store.Session.MessageRecord;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.transport.stomp.Stomp;
 import org.apache.activemq.transport.stomp.StompFrame;
 
@@ -131,4 +132,9 @@
         record.setMessageId(getMsgId());
         return record;
     }
+
+    public Buffer getTransactionId() {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=757475&r1=757474&r2=757475&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
Mon Mar 23 18:05:40 2009
@@ -157,12 +157,12 @@
         public boolean streamRemove(Long key);
 
         // Transaction related methods.
-        public Iterator<AsciiBuffer> transactionList(AsciiBuffer first, int max);
-        public void transactionAdd(AsciiBuffer txid);
-        public void transactionAddMessage(AsciiBuffer txid, Long messageKey);
-        public void transactionRemoveMessage(AsciiBuffer txid, AsciiBuffer queueName, Long
messageKey);
-        public boolean transactionCommit(AsciiBuffer txid);
-        public boolean transactionRollback(AsciiBuffer txid);
+        public Iterator<Buffer> transactionList(Buffer first, int max);
+        public void transactionAdd(Buffer txid);
+        public void transactionAddMessage(Buffer txid, Long messageKey);
+        public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey);
+        public boolean transactionCommit(Buffer txid);
+        public boolean transactionRollback(Buffer txid);
         
         // Queue related methods.
         public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=757475&r1=757474&r2=757475&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
Mon Mar 23 18:05:40 2009
@@ -177,26 +177,25 @@
         // ///////////////////////////////////////////////////////////////////////////////
         // Transaction related methods
         // ///////////////////////////////////////////////////////////////////////////////
-        public Iterator<AsciiBuffer> transactionList(AsciiBuffer first, int max) {
+        public Iterator<Buffer> transactionList(Buffer first, int max) {
             throw new UnsupportedOperationException();
         }
-        public void transactionAdd(AsciiBuffer txid) {
+        public void transactionAdd(Buffer txid) {
             throw new UnsupportedOperationException();
         }
-        public void transactionAddMessage(AsciiBuffer txid, Long messageKey) {
+        public void transactionAddMessage(Buffer txid, Long messageKey) {
             throw new UnsupportedOperationException();
         }
-        public void transactionRemoveMessage(AsciiBuffer txid, AsciiBuffer queue, Long messageKey)
{
+        public void transactionRemoveMessage(Buffer txid, AsciiBuffer queue, Long messageKey)
{
             throw new UnsupportedOperationException();
         }
-        public boolean transactionCommit(AsciiBuffer txid) {
+        public boolean transactionCommit(Buffer txid) {
             throw new UnsupportedOperationException();
         }
-        public boolean transactionRollback(AsciiBuffer txid) {
+        public boolean transactionRollback(Buffer txid) {
             throw new UnsupportedOperationException();
         }
 
-
     }
 
     public <R, T extends Exception> R execute(Callback<R, T> callback, Runnable
runnable) throws T {



Mime
View raw message