activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r745641 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/queue/ test/java/org/apache/activemq/flow/
Date Wed, 18 Feb 2009 20:47:18 GMT
Author: chirino
Date: Wed Feb 18 20:47:16 2009
New Revision: 745641

URL: http://svn.apache.org/viewvc?rev=745641&view=rev
Log:
- You can now force marshalling in the pipe transport
- Better toString in the SingleFlowRelay
- Producer can now send big messages
- Support doing a traditional sleep in the consumer to simulate think time.


Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java?rev=745641&r1=745640&r2=745641&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
Wed Feb 18 20:47:16 2009
@@ -45,4 +45,9 @@
 		// TODO Auto-generated method stub
 		return this;
 	}
+	
+	@Override
+	public String toString() {
+	    return getResourceName();
+	}
 }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=745641&r1=745640&r2=745641&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
Wed Feb 18 20:47:16 2009
@@ -35,7 +35,7 @@
 
 public class MockBrokerTest extends TestCase {
 
-    protected static final int PERFORMANCE_SAMPLES = 3;
+    protected static final int PERFORMANCE_SAMPLES = 30000;
 
     protected static final int IO_WORK_AMOUNT = 0;
     protected static final int FANIN_COUNT = 10;
@@ -52,6 +52,9 @@
 
     // Set to use tcp IO
     protected boolean tcp = true;
+    // set to force marshalling even in the NON tcp case.
+    protected boolean forceMarshalling = false;
+    
     protected String sendBrokerURI;
     protected String receiveBrokerURI;
 
@@ -91,8 +94,13 @@
             sendBrokerURI = "tcp://localhost:10000?wireFormat=proto";
             receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto";
         } else {
-            sendBrokerURI = "pipe://SendBroker";
-            receiveBrokerURI = "pipe://ReceiveBroker";
+            if( forceMarshalling ) {
+                sendBrokerURI = "pipe://SendBroker?wireFormat=proto";
+                receiveBrokerURI = "pipe://ReceiveBroker?wireFormat=proto";
+            } else {
+                sendBrokerURI = "pipe://SendBroker";
+                receiveBrokerURI = "pipe://ReceiveBroker";
+            }
         }
     }
     
@@ -423,13 +431,12 @@
     }
 
     private void stopServices() throws Exception {
-        for (MockBroker broker : brokers) {
-            broker.stopServices();
-        }
         if (dispatcher != null) {
             dispatcher.shutdown();
         }
-
+        for (MockBroker broker : brokers) {
+            broker.stopServices();
+        }
     }
 
     private void startServices() throws Exception {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java?rev=745641&r1=745640&r2=745641&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
Wed Feb 18 20:47:16 2009
@@ -4,7 +4,9 @@
 import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -21,6 +23,11 @@
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
 
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
 
@@ -38,6 +45,7 @@
         private Thread thread;
         private DispatchContext readContext;
         private String name;
+        private WireFormat wireFormat;
 
         public PipeTransport(Pipe<Object> pipe) {
             this.pipe = pipe;
@@ -79,7 +87,11 @@
         public void oneway(Object command) throws IOException {
 
             try {
-                pipe.write(command);
+                if( wireFormat!=null ) {
+                    pipe.write(wireFormat.marshal(command));
+                } else {
+                    pipe.write(command);
+                }
             } catch (InterruptedException e) {
                 throw new InterruptedIOException();
             }
@@ -92,13 +104,20 @@
 
         public boolean dispatch() {
             while (true) {
-
-                Object o = pipe.poll();
-                if (o == null) {
-                    pipe.setReadReadyListener(this);
-                    return true;
-                } else {
-                    listener.onCommand(o);
+                try {
+                    Object o = pipe.poll();
+                    if (o == null) {
+                        pipe.setReadReadyListener(this);
+                        return true;
+                    } else {
+                        if( wireFormat!=null ) {
+                            listener.onCommand(wireFormat.unmarshal((ByteSequence)o));
+                        } else {
+                            listener.onCommand(o);
+                        }
+                    }
+                } catch (IOException e) {
+                    listener.onException(e);
                 }
             }
         }
@@ -169,12 +188,17 @@
                 name = remoteAddress;
             }
         }
+
+        public void setWireFormat(WireFormat wireFormat) {
+            this.wireFormat = wireFormat;
+        }
     }
 
     private class PipeTransportServer implements TransportServer {
         private URI connectURI;
         private TransportAcceptListener listener;
         private String name;
+        private WireFormatFactory wireFormatFactory;
 
         public URI getConnectURI() {
             return connectURI;
@@ -219,22 +243,39 @@
             rc.setRemoteAddress(remoteAddress);
             PipeTransport serverSide = new PipeTransport(pipe.connect());
             serverSide.setRemoteAddress(remoteAddress);
+            if( wireFormatFactory!=null ) {
+                rc.setWireFormat(wireFormatFactory.createWireFormat());
+                serverSide.setWireFormat(wireFormatFactory.createWireFormat());
+            }
             listener.onAccept(serverSide);
             return rc;
         }
+
+        public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
+            this.wireFormatFactory = wireFormatFactory;
+        }
     }
 
     @Override
     public synchronized TransportServer doBind(URI uri) throws IOException {
-        String node = uri.getHost();
-        if (servers.containsKey(node)) {
-            throw new IOException("Server allready bound: " + node);
-        }
-        PipeTransportServer server = new PipeTransportServer();
-        server.setConnectURI(uri);
-        server.setName(node);
-        servers.put(node, server);
-        return server;
+        try {
+            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
+
+            String node = uri.getHost();
+            if (servers.containsKey(node)) {
+                throw new IOException("Server allready bound: " + node);
+            }
+            PipeTransportServer server = new PipeTransportServer();
+            server.setConnectURI(uri);
+            server.setName(node);
+            if( options.containsKey("wireFormat") ) {
+                server.setWireFormatFactory(createWireFormatFactory(options));
+            }
+            servers.put(node, server);
+            return server;
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
     }
 
     private synchronized void unbind(PipeTransportServer server) {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=745641&r1=745640&r2=745641&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
Wed Feb 18 20:47:16 2009
@@ -11,6 +11,8 @@
 import org.apache.activemq.flow.Commands.Message.MessageBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.StatefulWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
@@ -223,10 +225,14 @@
         }
 
         public ByteSequence marshal(Object value) throws IOException {
-            return null;
+            DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+            marshal(value, os);
+            return os.toByteSequence();
         }
+        
         public Object unmarshal(ByteSequence data) throws IOException {
-            return null;
+            DataByteArrayInputStream is = new DataByteArrayInputStream(data);
+            return unmarshal(is);
         }
     }
 

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java?rev=745641&r1=745640&r2=745641&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
Wed Feb 18 20:47:16 2009
@@ -1,15 +1,12 @@
 package org.apache.activemq.flow;
 
-import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
 import org.apache.activemq.transport.DispatchableTransport;
-import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 
 public class RemoteConsumer extends RemoteConnection{
@@ -20,6 +17,8 @@
     private long thinkTime;
     private Destination destination;
     private String selector;
+
+    private boolean schedualWait;
     
     public void start() throws Exception {
         consumerRate.name("Consumer " + name + " Rate");
@@ -32,6 +31,7 @@
             DispatchableTransport dt = ((DispatchableTransport)transport);
             dt.setName(name);
             dt.setDispatcher(getDispatcher());
+            schedualWait = true;
         }
         transport.setTransportListener(this);
         transport.start();
@@ -44,22 +44,34 @@
     }
     
     protected void messageReceived(final ISourceController<Message> controller, final
Message elem) {
-	    if (thinkTime > 0) {
-	        getDispatcher().schedule(new Runnable(){
-
-                public void run() {
-                    consumerRate.increment();
-                    controller.elementDispatched(elem);
+        if( schedualWait ) {
+            if (thinkTime > 0) {
+                getDispatcher().schedule(new Runnable(){
+
+                    public void run() {
+                        consumerRate.increment();
+                        controller.elementDispatched(elem);
+                    }
+                    
+                }, thinkTime, TimeUnit.MILLISECONDS);
+                
+            }
+            else
+            {
+                consumerRate.increment();
+                controller.elementDispatched(elem);
+            }
+
+        } else {
+            if( thinkTime>0 ) {
+                try {
+                    Thread.sleep(thinkTime);
+                } catch (InterruptedException e) {
                 }
-	            
-	        }, thinkTime, TimeUnit.MILLISECONDS);
-            
+            }
+            consumerRate.increment();
+            controller.elementDispatched(elem);
         }
-	    else
-	    {
-	        consumerRate.increment();
-	        controller.elementDispatched(elem);
-	    }
     }
 
     public void setName(String name) {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=745641&r1=745640&r2=745641&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
Wed Feb 18 20:47:16 2009
@@ -1,8 +1,6 @@
 package org.apache.activemq.flow;
 
-import java.io.IOException;
 import java.net.URI;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
@@ -12,12 +10,12 @@
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
 import org.apache.activemq.transport.DispatchableTransport;
-import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportListener;
 
 public class RemoteProducer extends RemoteConnection implements Dispatchable, FlowUnblockListener<Message>{
 
+    private static final int FILLER_SIZE = 100;
+
     private final MetricCounter rate = new MetricCounter();
 
     private AtomicLong messageIdGenerator;
@@ -30,8 +28,17 @@
     private MetricAggregator totalProducerRate;
     Message next;
     private DispatchContext dispatchContext;
+
+    private String filler;
     
     public void start() throws Exception {
+        
+        StringBuilder sb = new StringBuilder(FILLER_SIZE);
+        for( int i=0; i < FILLER_SIZE; ++i) {
+            sb.append('a'+(i%26));
+        }
+        filler = sb.toString();
+        
         rate.name("Producer " + name + " Rate");
         totalProducerRate.add(rate);
 
@@ -75,7 +82,7 @@
 	                priority = counter % priorityMod == 0 ? 0 : priority;
 	            }
 	
-	            next = new Message(messageIdGenerator.getAndIncrement(), producerId, name +
++counter, null, destination, priority);
+	            next = new Message(messageIdGenerator.getAndIncrement(), producerId, createPayload(),
null, destination, priority);
 	            if (property != null) {
 	                next.setProperty(property);
 	            }
@@ -95,6 +102,10 @@
 	        next = null;
 		}
 	}
+
+    private String createPayload() {
+        return name + ++counter+filler;
+    }
 	
 	public void setName(String name) {
         this.name = name;



Mime
View raw message