activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r752964 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/flow/ test/java/org/apache/activemq/flow/
Date Thu, 12 Mar 2009 18:40:33 GMT
Author: chirino
Date: Thu Mar 12 18:40:32 2009
New Revision: 752964

URL: http://svn.apache.org/viewvc?rev=752964&view=rev
Log:
adding files I missed adding in the last commit.

Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowManager.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowRelay.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ClientConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockClient.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java?rev=752964&r1=752963&r2=752964&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
Thu Mar 12 18:40:32 2009
@@ -45,7 +45,7 @@
         return resourceName;
     }
 
-    protected void setResourceName(String resourceName) {
+    public void setResourceName(String resourceName) {
         this.resourceName = resourceName;
     }
 

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowManager.java?rev=752964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowManager.java
(added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowManager.java
Thu Mar 12 18:40:32 2009
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Set;
+ 
+public class FlowManager
+{
+    
+    protected long resourceCounter = 0;
+    
+    private final HashMap <Long, IFlowResource> resources = new HashMap<Long, IFlowResource>
();
+    private final HashMap <Long, Flow> flows = new HashMap<Long, Flow> ();
+    private final HashMap <String, Flow> flowsByName = new HashMap<String, Flow>
();
+    
+    public synchronized void registerResource(IFlowResource resource)
+    {
+        resources.put(resource.getResourceId(), resource);
+    }
+    
+    public synchronized Flow createFlow(String name, boolean dynamic)
+    {
+        //TODO Should assign the flow based off of the hashcode of the 
+        //name, and handle collisions. 
+        //TODO Also implement dynamic flows whereby resources participating
+        //in a flow register with it, so that flows can be expired. 
+        Flow flow = getFlow(name);
+        if(flow != null)
+        {
+            return flow;
+        }
+        else
+        {
+            flow = new Flow(name, dynamic);
+            flowsByName.put(name, flow);
+            flows.put(flow.getFlowID(), flow);
+            return flow;
+        }
+    }
+
+    public synchronized IFlowResource getResource(long id) {
+        return resources.get(id);
+    }
+    
+    public synchronized Flow getFlow(long id) {
+        return flows.get(id);
+    }
+    
+    public synchronized Flow getFlow(String name) {
+        return flowsByName.get(name);
+    }
+    
+    public IFlowResource removeResource(long id) {
+        return resources.remove(id);
+    }
+    
+    public Collection<Flow> getRegisteredFlows() {
+        return flows.values();
+    }
+
+    public Collection <IFlowResource> getRegisteredResources() {
+        return resources.values();
+    }
+
+    /**
+     * Returns a list of flow resources ids. Used for tooling.
+     */
+    public synchronized ArrayList<Long> getRegisteredResourceIDs()
+    {
+        Set<Long> rids = resources.keySet();
+        ArrayList<Long> ret = new ArrayList<Long>(rids.size());
+        ret.addAll(rids);
+        return ret;
+    }
+    
+    /**
+     * Returns a list of flow resources ids. Used for tooling.
+     */
+    public synchronized ArrayList<Long> getRegisteredFlowIDs()
+    {
+        Set<Long> fids = flows.keySet();
+        ArrayList<Long> ret = new ArrayList<Long>(fids.size());
+        ret.addAll(fids);
+        return ret;
+    }
+}
+    /*
+    private class FlowMetricsCollector
+        implements Runnable
+    {
+        private final long m_collectionInterval = 10000;
+
+        private Thread m_thread;
+
+        private boolean m_started;
+
+        FlowMetricsCollector()
+        {
+
+        }
+
+        public void start()
+        {
+            synchronized ( this )
+            {
+                if ( m_started )
+                {
+                    return;
+                }
+                m_thread = new Thread( this, "FlowMetricsCollector" );
+                m_started = true;
+                m_thread.start();
+            }
+        }
+
+        public void shutdown()
+            throws InterruptedException
+        {
+            synchronized ( this )
+            {
+                if ( !m_started )
+                {
+                    return;
+                }
+                m_thread.interrupt();
+
+                try
+                {
+                    m_thread.join();
+                }
+                finally
+                {
+                    m_thread = null;
+                    m_started = false;
+                }
+            }
+        }
+
+        public void run()
+        {
+            while ( !Thread.currentThread().isInterrupted() )
+            {
+                long startTime = System.currentTimeMillis();
+
+                Iterator metrics = getFlowMetrics().iterator();
+                while ( metrics.hasNext() )
+                {
+                    FlowMetrics fm = (FlowMetrics) metrics.next();
+                    fm.collect();
+                }
+
+                long endTime = System.currentTimeMillis();
+                long timeToNext = m_collectionInterval - ( endTime - startTime );
+
+                if ( timeToNext <= 0 )
+                {
+                    // TODO FLOWCONTROL should comment this out.
+                    System.out.println( "Unable to maintain specified flow metrics collection
interval of "
+                        + m_collectionInterval + "ms, last collection took: " + ( endTime
- startTime ) + "ms" );
+                }
+                else
+                {
+                    try
+                    {
+                        Thread.sleep( timeToNext );
+                    }
+                    catch ( InterruptedException ie )
+                    {
+                        return;
+                    }
+                }
+            }
+        }
+    }*/

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowRelay.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowRelay.java?rev=752964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowRelay.java
(added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowRelay.java
Thu Mar 12 18:40:32 2009
@@ -0,0 +1,5 @@
+package org.apache.activemq.flow;
+
+public interface IFlowRelay<E> extends IFlowSink<E>, IFlowSource<E> {
+
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ClientConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ClientConnection.java?rev=752964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ClientConnection.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ClientConnection.java
Thu Mar 12 18:40:32 2009
@@ -0,0 +1,26 @@
+package org.apache.activemq.flow;
+
+import java.net.URI;
+
+import org.apache.activemq.transport.DispatchableTransport;
+import org.apache.activemq.transport.TransportFactory;
+
+public abstract class ClientConnection extends AbstractTestConnection{
+
+    private URI connectUri;
+    
+    public void setConnectUri(URI uri) {
+        this.connectUri = uri;
+    }
+
+    public void start() throws Exception {
+        transport = TransportFactory.compositeConnect(connectUri);
+        transport.setTransportListener(this);
+        super.setTransport(transport);
+        super.initialize();
+        super.start();
+        // Let the remote side know our name.
+        write(name);
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockClient.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockClient.java?rev=752964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockClient.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockClient.java
Thu Mar 12 18:40:32 2009
@@ -0,0 +1,317 @@
+package org.apache.activemq.flow;
+
+import java.io.FileInputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.util.IntrospectionSupport;
+
+public class MockClient {
+
+    protected int performanceSamples = 3;
+    protected int samplingFrequency = 5000;
+
+    protected int numProducers = 1;
+    protected int numConsumers = 1;
+    protected int destCount = 1;
+    protected int numPriorities = 1;
+    protected boolean useInputQueues = false;
+
+    // Set to mockup up ptp:
+    protected boolean ptp = false;
+
+    protected String sendBrokerURI;
+    protected String receiveBrokerURI;
+
+    // Sets the number of threads to use:
+    protected int threadsPerDispatcher = Runtime.getRuntime().availableProcessors();
+
+    protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate
Producer Rate").unit("items");
+    protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate
Consumer Rate").unit("items");
+    protected ArrayList<MetricCounter> additionalReportMetrics = new ArrayList<MetricCounter>();
+    protected boolean includeDetailedRates = false;
+
+    protected IDispatcher dispatcher;
+
+    public RemoteConsumer consumer(int index) {
+        return consumers.get(index);
+    }
+
+    public RemoteProducer producer(int index) {
+        return producers.get(index);
+    }
+
+    public int getThreadsPerDispatcher() {
+        return threadsPerDispatcher;
+    }
+
+    public boolean isUseInputQueues() {
+        return useInputQueues;
+    }
+
+    public void setUseInputQueues(boolean useInputQueues) {
+        this.useInputQueues = useInputQueues;
+    }
+    
+    public void setThreadsPerDispatcher(int threadPoolSize) {
+        this.threadsPerDispatcher = threadPoolSize;
+    }
+    
+    public void setIncludeDetailedRates(boolean includeDetailedRates) {
+        this.includeDetailedRates = includeDetailedRates;
+    }
+
+    public boolean getIncludeDetailedRates() {
+        return includeDetailedRates;
+    }
+
+    public void includeInRateReport(RemoteProducer producer) {
+        additionalReportMetrics.add(producer.getRate());
+    }
+
+    public void includeInRateReport(RemoteConsumer consumer) {
+        additionalReportMetrics.add(consumer.getRate());
+    }
+    
+    public int getSamplingFrequency() {
+        return samplingFrequency;
+    }
+
+    public void setSamplingFrequency(int samplingFrequency) {
+        this.samplingFrequency = samplingFrequency;
+    }
+
+
+    public int getNumProducers() {
+        return numProducers;
+    }
+
+    public void setNumProducers(int numProducers) {
+        this.numProducers = numProducers;
+    }
+
+    public int getNumConsumers() {
+        return numConsumers;
+    }
+
+    public void setNumConsumers(int numConsumers) {
+        this.numConsumers = numConsumers;
+    }
+
+    public int getDestCount() {
+        return destCount;
+    }
+
+    public void setDestCount(int destCount) {
+        this.destCount = destCount;
+    }
+
+    public int getNumPriorities() {
+        return numPriorities;
+    }
+
+    public void setNumPriorities(int numPriorities) {
+        this.numPriorities = numPriorities;
+    }
+
+    public boolean isPtp() {
+        return ptp;
+    }
+
+    public void setPtp(boolean ptp) {
+        this.ptp = ptp;
+    }
+
+    public String getSendBrokerURI() {
+        return sendBrokerURI;
+    }
+
+    public void setSendBrokerURI(String sendBrokerURI) {
+        this.sendBrokerURI = sendBrokerURI;
+    }
+
+    public String getReceiveBrokerURI() {
+        return receiveBrokerURI;
+    }
+
+    public void setReceiveBrokerURI(String receiveBrokerURI) {
+        this.receiveBrokerURI = receiveBrokerURI;
+    }
+
+    public int getPerformanceSamples() {
+        return performanceSamples;
+    }
+
+    
+    protected final AtomicLong msgIdGenerator = new AtomicLong();
+
+    final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
+    final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
+
+    private String testName;
+
+    private void createConsumer(int i, String connectUri, Destination destination) throws
URISyntaxException {
+        RemoteConsumer consumer = new RemoteConsumer();
+        consumer.setDestination(destination);
+        consumer.setName("consumer" + (i + 1));
+        consumer.setTotalConsumerRate(totalConsumerRate);
+        consumer.setDispatcher(dispatcher);
+        consumer.setConnectUri(new URI(connectUri));
+        consumer.setUseInputQueue(useInputQueues);
+        consumers.add(consumer);
+    }
+
+    private void createProducer(int id, String connectUri, Destination destination) throws
URISyntaxException {
+        RemoteProducer producer = new RemoteProducer();
+        producer.setProducerId(id + 1);
+        producer.setName("producer" + (id + 1));
+        producer.setDestination(destination);
+        producer.setMessageIdGenerator(msgIdGenerator);
+        producer.setTotalProducerRate(totalProducerRate);
+        producer.setDispatcher(dispatcher);
+        producer.setUseInputQueue(useInputQueues);
+        producer.setConnectUri(new URI(connectUri));
+        producers.add(producer);
+    }
+
+    private void reportRates() throws InterruptedException {
+        System.out.println("Checking rates for test: " + getTestName() + ", " + (ptp ? "ptp"
: "topic"));
+        for (int i = 0; i < performanceSamples; i++) {
+            Period p = new Period();
+            Thread.sleep(samplingFrequency);
+            System.out.println(totalProducerRate.getRateSummary(p));
+            System.out.println(totalConsumerRate.getRateSummary(p));
+            if (includeDetailedRates) {
+                System.out.println(totalProducerRate.getChildRateSummary(p));
+                System.out.println(totalConsumerRate.getChildRateSummary(p));
+            }
+            totalProducerRate.reset();
+            totalConsumerRate.reset();
+        }
+    }
+
+    public void setTestName(String testName) {
+        this.testName = testName;
+    }
+
+    public void setPerformanceSamples(int samples) {
+        this.performanceSamples = samples;
+    }
+
+    public String getTestName() {
+        return testName;
+    }
+
+    public void setDispatcher(IDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public void runTest() throws Exception {
+        getDispatcher().start();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    private void startServices() throws Exception {
+        AbstractTestConnection.setInShutdown(false, dispatcher);
+        for (RemoteConsumer connection : consumers) {
+            connection.start();
+        }
+
+        for (RemoteProducer connection : producers) {
+            connection.start();
+        }
+    }
+
+    private void stopServices() throws Exception {
+
+        AbstractTestConnection.setInShutdown(true, dispatcher);
+        for (RemoteProducer connection : producers) {
+            connection.stop();
+        }
+        for (RemoteConsumer connection : consumers) {
+            connection.stop();
+        }
+    }
+
+    public void createConnections() throws Exception {
+
+        DestinationBuffer[] dests = new DestinationBuffer[destCount];
+
+        for (int i = 0; i < destCount; i++) {
+            DestinationBean bean = new DestinationBean();
+            bean.setName(new AsciiBuffer("dest" + (i + 1)));
+            bean.setPtp(ptp);
+            dests[i] = bean.freeze();
+        }
+
+        for (int i = 0; i < numProducers; i++) {
+            Destination destination = dests[i % destCount];
+            createProducer(i, sendBrokerURI, destination);
+        }
+
+        for (int i = 0; i < numConsumers; i++) {
+            Destination destination = dests[i % destCount];
+            createConsumer(i, receiveBrokerURI, destination);
+        }
+    }
+
+    public IDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    protected IDispatcher createDispatcher() {
+        if (dispatcher == null) {
+            dispatcher = PriorityDispatcher.createPriorityDispatchPool("ClientDispatcher",
numPriorities, threadsPerDispatcher);
+        }
+        return dispatcher;
+    }
+
+    /**
+     * Run the broker as a standalone app
+     * 
+     * @param args
+     *            The arguments.
+     * @throws Exception
+     */
+    public static void main(String[] args) throws Exception {
+        MockClient test = new MockClient();
+        test.createDispatcher();
+        
+        Properties props = new Properties();
+        if (args.length > 0) {
+            props.load(new FileInputStream(args[0]));
+            IntrospectionSupport.setProperties(test, props);
+        }
+        System.out.println(IntrospectionSupport.toString(test));
+        try
+        {
+            test.getDispatcher().start();
+            test.createConnections();
+            test.runTest();
+        }
+        finally
+        {
+            test.getDispatcher().shutdown();
+        }
+    }
+
+}



Mime
View raw message