activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r754990 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/stomp/ test/java/org/apache/activemq/broker/openwire/stomp/
Date Mon, 16 Mar 2009 19:58:59 GMT
Author: chirino
Date: Mon Mar 16 19:58:58 2009
New Revision: 754990

URL: http://svn.apache.org/viewvc?rev=754990&view=rev
Log:
Fixed up flow control for the Stomp protocol

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=754990&r1=754989&r2=754990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Mon Mar
16 19:58:58 2009
@@ -38,10 +38,11 @@
     protected int outputResumeThreshold = 900;
     protected int inputWindowSize = 1000;
     protected int inputResumeThreshold = 500;
+    protected boolean useAsyncWriteThread = true;
     
     private IDispatcher dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean();
-    private  ExecutorService blockingWriter;
+    private ExecutorService blockingWriter;
     private ExceptionListener exceptionListener;
     
     
@@ -58,7 +59,9 @@
             }
             dt.setDispatcher(getDispatcher());
         } else {
-            blockingWriter = Executors.newSingleThreadExecutor();
+            if( useAsyncWriteThread ) {
+                blockingWriter = Executors.newSingleThreadExecutor();
+            }
         }
         transport.start();
     }
@@ -77,9 +80,16 @@
     }
     
     public final void write(final Object o) {
+        write(o, null);
+    }
+    
+    public final void write(final Object o, final Runnable onCompleted) {
         if (blockingWriter==null) {
             try {
                 transport.oneway(o);
+                if( onCompleted!=null ) {
+                    onCompleted.run();
+                }
             } catch (IOException e) {
                 onException(e);
             }
@@ -90,6 +100,9 @@
                         if (!stopping.get()) {
                             try {
                                 transport.oneway(o);
+                                if( onCompleted!=null ) {
+                                    onCompleted.run();
+                                }
                             } catch (IOException e) {
                                 onException(e);
                             }
@@ -172,4 +185,12 @@
         this.exceptionListener = exceptionListener;
     }
 
+    public boolean isUseAsyncWriteThread() {
+        return useAsyncWriteThread;
+    }
+
+    public void setUseAsyncWriteThread(boolean useAsyncWriteThread) {
+        this.useAsyncWriteThread = useAsyncWriteThread;
+    }
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=754990&r1=754989&r2=754990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
Mon Mar 16 19:58:58 2009
@@ -34,7 +34,6 @@
     public StompMessageDelivery(StompFrame frame, Destination destiantion) {
         this.frame = frame;
         this.destination = destiantion;
-        this.frame.setAction(Stomp.Responses.MESSAGE);
         this.receiptId = frame.getHeaders().remove(Stomp.Headers.RECEIPT_REQUESTED);
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=754990&r1=754989&r2=754990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
Mon Mar 16 19:58:58 2009
@@ -105,6 +105,7 @@
                 String dest = frame.getHeaders().get(Stomp.Headers.Send.DESTINATION);
                 Destination destination = translator(frame).convertToDestination(StompProtocolHandler.this,
dest);
                 
+                frame.setAction(Stomp.Responses.MESSAGE);
                 StompMessageDelivery md = new StompMessageDelivery(frame, destination);
                 while (!inboundController.offer(md, null)) {
                     inboundController.waitForFlowUnblock();
@@ -165,9 +166,13 @@
         limiter = new SizeLimiter<MessageDelivery>(connection.getOutputWindowSize(),
connection.getOutputWindowSize());
         outboundQueue = new SingleFlowRelay<MessageDelivery>(outboundFlow, outboundFlow.getFlowName(),
limiter);
         outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
-            public void drain(final MessageDelivery message, ISourceController<MessageDelivery>
controller) {
+            public void drain(final MessageDelivery message, final ISourceController<MessageDelivery>
controller) {
                 StompFrame msg = message.asType(StompFrame.class);
-                connection.write(msg);
+                connection.write(msg, new Runnable() {
+                    public void run() {
+                        controller.elementDispatched(message);
+                    }
+                });
             };
         });
 
@@ -187,6 +192,9 @@
             actionHander.onStompFrame(command);
         } catch (Exception error) {
             try {
+                
+                error.printStackTrace();
+                
                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
                 error.printStackTrace(stream);
@@ -343,22 +351,25 @@
             if (stompMessage == null) {
                 return false;
             }
-
-            Message msg = message.asType(Message.class);
-            if (msg == null) {
-                return false;
-            }
-
-            // TODO: abstract the Selector bits so that it is not openwire specific.
-            MessageEvaluationContext selectorContext = new MessageEvaluationContext();
-            selectorContext.setMessageReference(msg);
-            selectorContext.setDestination(msg.getDestination());
-            try {
-                return (selector == null || selector.matches(selectorContext));
-            } catch (JMSException e) {
-                e.printStackTrace();
-                return false;
-            }
+            
+            return true;
+            
+//          TODO: implement selector bits.
+//            Message msg = message.asType(Message.class);
+//            if (msg == null) {
+//                return false;
+//            }
+//
+//            // TODO: abstract the Selector bits so that it is not openwire specific.
+//            MessageEvaluationContext selectorContext = new MessageEvaluationContext();
+//            selectorContext.setMessageReference(msg);
+//            selectorContext.setDestination(msg.getDestination());
+//            try {
+//                return (selector == null || selector.matches(selectorContext));
+//            } catch (JMSException e) {
+//                e.printStackTrace();
+//                return false;
+//            }
         }
 
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java?rev=754990&r1=754989&r2=754990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
Mon Mar 16 19:58:58 2009
@@ -23,6 +23,8 @@
     private FlowController<MessageDelivery> inboundController;
     private String stompDestination;
     
+    public StompRemoteConsumer() {
+    }
 
     protected void setupSubscription() throws Exception, IOException {
         if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java?rev=754990&r1=754989&r2=754990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
Mon Mar 16 19:58:58 2009
@@ -19,6 +19,9 @@
 
     private String stompDestination;
 
+    StompRemoteProducer() {
+    }
+    
     protected void setupProducer() throws Exception, IOException {
         if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
             stompDestination = "/queue/"+destination.getName().toString();
@@ -40,9 +43,13 @@
         
         outboundController = outboundQueue.getFlowController(flow);
         outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
-            public void drain(MessageDelivery message, ISourceController<MessageDelivery>
controller) {
+            public void drain(final MessageDelivery message, final ISourceController<MessageDelivery>
controller) {
                 StompFrame msg = message.asType(StompFrame.class);
-                write(msg);
+                write(msg, new Runnable(){
+                    public void run() {
+                        controller.elementDispatched(message);
+                    }
+                });
             }
         });
     }
@@ -77,7 +84,11 @@
             headers.put(property, property);
         }
         
-        StompFrame fram = new StompFrame(Stomp.Commands.SEND, headers, toContent(createPayload()));
+        byte[] content = toContent(createPayload());
+        
+        headers.put(Stomp.Headers.CONTENT_LENGTH, ""+content.length);
+        
+        StompFrame fram = new StompFrame(Stomp.Commands.SEND, headers, content);
         next = new StompMessageDelivery(fram, getDestination());
     }
 



Mime
View raw message