activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r752973 [2/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ test/java/org/apache/activemq/broker/ test/java/org/apache/activemq/...
Date Thu, 12 Mar 2009 18:59:13 GMT
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java?rev=752973&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
Thu Mar 12 18:59:12 2009
@@ -0,0 +1,298 @@
+package org.apache.activemq.broker.openwire;
+
+import static org.apache.activemq.broker.openwire.OpenWireSupport.createConnectionInfo;
+import static org.apache.activemq.broker.openwire.OpenWireSupport.createMessage;
+import static org.apache.activemq.broker.openwire.OpenWireSupport.createProducerInfo;
+import static org.apache.activemq.broker.openwire.OpenWireSupport.createSessionInfo;
+
+import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.Connection;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ProducerAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowDrain;
+import org.apache.activemq.flow.ISinkController;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.ISinkController.FlowUnblockListener;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.transport.DispatchableTransport;
+import org.apache.activemq.transport.TransportFactory;
+
+public class RemoteProducer extends Connection implements Dispatchable, FlowUnblockListener<MessageDelivery>
{
+
+    private final MetricCounter rate = new MetricCounter();
+
+    private AtomicLong messageIdGenerator;
+    private int priority;
+    private int priorityMod;
+    private int counter;
+    private int producerId;
+    private Destination destination;
+    private String property;
+    private MetricAggregator totalProducerRate;
+    private MessageDelivery next;
+    private DispatchContext dispatchContext;
+    private String filler;
+    private int payloadSize = 20;
+    private URI uri;
+    private ConnectionInfo connectionInfo;
+    private SessionInfo sessionInfo;
+    private ProducerInfo producerInfo;
+    private ActiveMQDestination activemqDestination;
+
+    private WindowLimiter<MessageDelivery> outboundLimiter;
+
+    private IFlowController<MessageDelivery> outboundController;
+    
+    public void start() throws Exception {
+        
+        if( payloadSize>0 ) {
+            StringBuilder sb = new StringBuilder(payloadSize);
+            for( int i=0; i < payloadSize; ++i) {
+                sb.append((char)('a'+(i%26)));
+            }
+            filler = sb.toString();
+        }
+        
+        rate.name("Producer " + name + " Rate");
+        totalProducerRate.add(rate);
+
+        transport = TransportFactory.compositeConnect(uri);
+        transport.setTransportListener(this);
+        if(transport instanceof DispatchableTransport)
+        {
+            DispatchableTransport dt = ((DispatchableTransport)transport);
+            dt.setName(name + "-client-transport");
+            dt.setDispatcher(getDispatcher());
+        }
+        super.setTransport(transport);
+       
+        super.initialize();
+        transport.start();
+        
+        if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
+            activemqDestination = new ActiveMQQueue(destination.getName().toString());
+        } else {
+            activemqDestination = new ActiveMQTopic(destination.getName().toString());
+        }
+        
+        connectionInfo = createConnectionInfo();
+        transport.oneway(connectionInfo);
+        sessionInfo = createSessionInfo(connectionInfo);
+        transport.oneway(sessionInfo);
+        producerInfo = createProducerInfo(sessionInfo);
+        producerInfo.setWindowSize(1024*4);
+        transport.oneway(producerInfo);
+        
+        dispatchContext = getDispatcher().register(this, name + "-client");
+        dispatchContext.requestDispatch();
+    }
+    
+    protected void initialize() {
+        Flow ouboundFlow = new Flow(name, false);
+        outboundLimiter = new WindowLimiter<MessageDelivery>(true, ouboundFlow, outputWindowSize,
outputResumeThreshold);
+        outputQueue = new SingleFlowRelay<MessageDelivery>(ouboundFlow, name + "-outbound",
outboundLimiter);
+        outboundController = outputQueue.getFlowController(ouboundFlow);
+
+        if (transport instanceof DispatchableTransport) {
+            outputQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+
+                public void drain(MessageDelivery message, ISourceController<MessageDelivery>
controller) {
+                    write(message);
+                }
+            });
+
+        } else {
+            blockingTransport = true;
+            blockingWriter = Executors.newSingleThreadExecutor();
+            outputQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+                public void drain(final MessageDelivery message, ISourceController<MessageDelivery>
controller) {
+                    write(message);
+                };
+            });
+        }
+    }
+    
+    public void stop() throws Exception
+    {
+    	dispatchContext.close(false);
+    	super.stop();
+    }
+
+    
+    public void onCommand(Object command) {
+        try {
+            if (command.getClass() == ProducerAck.class) {
+                ProducerAck fc = (ProducerAck) command;
+                synchronized (outputQueue) {
+                    outboundLimiter.onProtocolCredit(fc.getSize());
+                }
+            } else {
+                onException(new Exception("Unrecognized command: " + command));
+            }
+        } catch (Exception e) {
+            onException(e);
+        }
+    }
+    
+	public void onFlowUnblocked(ISinkController<MessageDelivery> controller) {
+		dispatchContext.requestDispatch();
+	}
+
+	public boolean dispatch() {
+		while(true)
+		{
+			
+			if(next == null)
+			{
+	            int priority = this.priority;
+	            if (priorityMod > 0) {
+	                priority = counter % priorityMod == 0 ? 0 : priority;
+	            }
+	
+	            ActiveMQTextMessage msg = createMessage(producerInfo, activemqDestination, priority,
createPayload());
+                if (property != null) {
+                    try {
+                        msg.setStringProperty(property, property);
+                    } catch (JMSException e) {
+                        new RuntimeException(e);
+                    }
+                }
+	            next = new OpenWireMessageDelivery(msg);
+			}
+	        
+			//If flow controlled stop until flow control is lifted.
+			if(outboundController.isSinkBlocked())
+			{
+				if(outboundController.addUnblockListener(this))
+				{
+					return true;
+				}
+			}
+			
+	        getSink().add(next, null);
+	        rate.increment();
+	        next = null;
+		}
+	}
+
+    private String createPayload() {
+        if( payloadSize>=0 ) {
+            StringBuilder sb = new StringBuilder(payloadSize);
+            sb.append(name);
+            sb.append(':');
+            sb.append(++counter);
+            sb.append(':');
+            int length = sb.length();
+            if( length <= payloadSize ) {
+                sb.append(filler.subSequence(0, payloadSize-length));
+                return sb.toString();
+            } else {
+               return sb.substring(0, payloadSize); 
+            }
+        } else {
+            return name+":"+(++counter);
+        }
+    }
+	
+	public void setName(String name) {
+        this.name = name;
+    }
+
+    public AtomicLong getMessageIdGenerator() {
+        return messageIdGenerator;
+    }
+
+    public void setMessageIdGenerator(AtomicLong msgIdGenerator) {
+        this.messageIdGenerator = msgIdGenerator;
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public void setPriority(int msgPriority) {
+        this.priority = msgPriority;
+    }
+
+    public int getPriorityMod() {
+        return priorityMod;
+    }
+
+    public void setPriorityMod(int priorityMod) {
+        this.priorityMod = priorityMod;
+    }
+
+    public int getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(int producerId) {
+        this.producerId = producerId;
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+    }
+
+    public String getProperty() {
+        return property;
+    }
+
+    public void setProperty(String property) {
+        this.property = property;
+    }
+
+    public MetricAggregator getTotalProducerRate() {
+        return totalProducerRate;
+    }
+
+    public void setTotalProducerRate(MetricAggregator totalProducerRate) {
+        this.totalProducerRate = totalProducerRate;
+    }
+
+    public MetricCounter getRate() {
+        return rate;
+    }
+
+    public int getPayloadSize() {
+        return payloadSize;
+    }
+
+    public void setPayloadSize(int messageSize) {
+        this.payloadSize = messageSize;
+    }
+
+    public URI getUri() {
+        return uri;
+    }
+
+    public void setUri(URI uri) {
+        this.uri = uri;
+    }
+}
+



Mime
View raw message