camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aka...@apache.org
Subject svn commit: r1022538 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/model/loadbalancer/ main/java/org/apache/camel/processor/loadbalancer/ test/java/org/apache/camel/processor/
Date Thu, 14 Oct 2010 13:59:56 GMT
Author: akarpe
Date: Thu Oct 14 13:59:55 2010
New Revision: 1022538

URL: http://svn.apache.org/viewvc?rev=1022538&view=rev
Log:
CAMEL-3197 - Added a new Load Balancing algorithm that provides weighted round-robin and weighted
random support

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/WeightedLoadBalancerDefinition.java
  (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/DistributionRatio.java
  (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedLoadBalancer.java
  (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
  (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRoundRobinLoadBalancer.java
  (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRandomLoadBalanceTest.java
  (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRoundRobinLoadBalanceTest.java
  (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java?rev=1022538&r1=1022537&r2=1022538&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
Thu Oct 14 13:59:55 2010
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -35,12 +36,16 @@ import org.apache.camel.model.loadbalanc
 import org.apache.camel.model.loadbalancer.RoundRobinLoadBalancerDefinition;
 import org.apache.camel.model.loadbalancer.StickyLoadBalancerDefinition;
 import org.apache.camel.model.loadbalancer.TopicLoadBalancerDefinition;
+import org.apache.camel.model.loadbalancer.WeightedLoadBalancerDefinition;
 import org.apache.camel.processor.loadbalancer.FailOverLoadBalancer;
 import org.apache.camel.processor.loadbalancer.LoadBalancer;
 import org.apache.camel.processor.loadbalancer.RandomLoadBalancer;
 import org.apache.camel.processor.loadbalancer.RoundRobinLoadBalancer;
 import org.apache.camel.processor.loadbalancer.StickyLoadBalancer;
 import org.apache.camel.processor.loadbalancer.TopicLoadBalancer;
+import org.apache.camel.processor.loadbalancer.WeightedLoadBalancer;
+import org.apache.camel.processor.loadbalancer.WeightedRandomLoadBalancer;
+import org.apache.camel.processor.loadbalancer.WeightedRoundRobinLoadBalancer;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.CollectionStringBuffer;
 
@@ -58,7 +63,8 @@ public class LoadBalanceDefinition exten
             @XmlElement(required = false, name = "random", type = RandomLoadBalancerDefinition.class),
             @XmlElement(required = false, name = "roundRobin", type = RoundRobinLoadBalancerDefinition.class),
             @XmlElement(required = false, name = "sticky", type = StickyLoadBalancerDefinition.class),
-            @XmlElement(required = false, name = "topic", type = TopicLoadBalancerDefinition.class)}
+            @XmlElement(required = false, name = "topic", type = TopicLoadBalancerDefinition.class),
+            @XmlElement(required = false, name = "weighted", type = WeightedLoadBalancerDefinition.class)}
     )
     private LoadBalancerDefinition loadBalancerType;
 
@@ -182,6 +188,24 @@ public class LoadBalanceDefinition exten
     }
 
     /**
+     * Uses weighted load balancer
+     *
+     * @param roundRobin               used to set the processor selection algorithm.
+     * @param distributionRatioList    ArrayList<Long> of weighted ratios for distribution
of messages.
+     * @return the builder
+     */
+    public LoadBalanceDefinition weighted(boolean roundRobin, ArrayList<Integer> distributionRatioList)
{
+        WeightedLoadBalancer weighted;
+        if (!roundRobin) {
+            weighted = new WeightedRandomLoadBalancer(distributionRatioList);
+        } else {
+            weighted = new WeightedRoundRobinLoadBalancer(distributionRatioList);
+        }
+        loadBalancerType = new LoadBalancerDefinition(weighted);
+        return this;
+    }
+    
+    /**
      * Uses round robin load balancer
      *
      * @return the builder

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/WeightedLoadBalancerDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/WeightedLoadBalancerDefinition.java?rev=1022538&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/WeightedLoadBalancerDefinition.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/WeightedLoadBalancerDefinition.java
Thu Oct 14 13:59:55 2010
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model.loadbalancer;
+
+import java.util.ArrayList;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.camel.model.LoadBalancerDefinition;
+import org.apache.camel.processor.loadbalancer.LoadBalancer;
+import org.apache.camel.processor.loadbalancer.WeightedLoadBalancer;
+import org.apache.camel.processor.loadbalancer.WeightedRandomLoadBalancer;
+import org.apache.camel.processor.loadbalancer.WeightedRoundRobinLoadBalancer;
+import org.apache.camel.spi.RouteContext;
+
+/**
+ * Represents an XML &lt;sticky/&gt; element
+ */
+@XmlRootElement(name = "weighted")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class WeightedLoadBalancerDefinition extends LoadBalancerDefinition {
+    
+    @XmlElement(name = "roundRobin", required = true)
+    private boolean roundRobin;
+    
+    @XmlElement(name = "distributionRatios", required = true)
+    private ArrayList<Integer> distributionRatioList;
+    
+    @Override
+    protected LoadBalancer createLoadBalancer(RouteContext routeContext) {
+        WeightedLoadBalancer loadBalancer = null;
+        
+        try {
+            if (!roundRobin) {
+                loadBalancer = new WeightedRandomLoadBalancer(distributionRatioList);
+            } else {
+                loadBalancer = new WeightedRoundRobinLoadBalancer(distributionRatioList);
+            }
+        } catch (Exception e) {
+            
+        }
+        return loadBalancer;
+    }
+
+    public boolean isRoundRobin() {
+        return roundRobin;
+    }
+
+    public void setRoundRobin(boolean roundRobin) {
+        this.roundRobin = roundRobin;
+    }
+
+    public ArrayList<Integer> getDistributionRatioList() {
+        return distributionRatioList;
+    }
+
+    public void setDistributionRatioList(ArrayList<Integer> distributionRatioList)
{
+        this.distributionRatioList = distributionRatioList;
+    }
+
+    @Override
+    public String toString() {
+        if (!roundRobin) { 
+            return "WeightedRandomLoadBalancer[" + distributionRatioList + "]";
+        } else {
+            return "WeightedRoundRobinLoadBalancer[" + distributionRatioList + "]";
+        }
+    }
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/WeightedLoadBalancerDefinition.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/DistributionRatio.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/DistributionRatio.java?rev=1022538&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/DistributionRatio.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/DistributionRatio.java
Thu Oct 14 13:59:55 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+public class DistributionRatio {
+    private int processorPosition;
+    private int distributionWeight;
+    private int runtimeWeight;
+    
+    public DistributionRatio(int processorPosition, int distributionWeight) {
+        super();
+        this.processorPosition = processorPosition;
+        this.distributionWeight = distributionWeight;
+        this.runtimeWeight = distributionWeight;
+    }
+
+    public DistributionRatio(int processorPosition, int distributionWeight, int runtimeWeight)
{
+        super();
+        this.processorPosition = processorPosition;
+        this.distributionWeight = distributionWeight;
+        this.runtimeWeight = runtimeWeight;
+    }
+    
+    public int getProcessorPosition() {
+        return processorPosition;
+    }
+
+    public void setProcessorPosition(int processorPosition) {
+        this.processorPosition = processorPosition;
+    }
+
+    public int getDistributionWeight() {
+        return distributionWeight;
+    }
+
+    public void setDistributionWeight(int distributionWeight) {
+        this.distributionWeight = distributionWeight;
+    }
+
+    public int getRuntimeWeight() {
+        return runtimeWeight;
+    }
+
+    public void setRuntimeWeight(int runtimeWeight) {
+        this.runtimeWeight = runtimeWeight;
+    }
+    
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/DistributionRatio.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedLoadBalancer.java?rev=1022538&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedLoadBalancer.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedLoadBalancer.java
Thu Oct 14 13:59:55 2010
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public abstract class WeightedLoadBalancer extends QueueLoadBalancer {
+    private static final transient Log LOG = LogFactory.getLog(WeightedLoadBalancer.class);
+    private ArrayList<Integer> distributionRatioList = new ArrayList<Integer>();
+    private ArrayList<DistributionRatio> runtimeRatios = new ArrayList<DistributionRatio>();
+    
+    public WeightedLoadBalancer(ArrayList<Integer> distributionRatios) {
+        deepCloneDistributionRatios(distributionRatios);
+        loadRuntimeRatios(distributionRatios);
+    }
+    
+    protected void deepCloneDistributionRatios(ArrayList<Integer> distributionRatios)
{
+        for (Integer value : distributionRatios) {
+            this.distributionRatioList.add(value);
+        }
+    }
+    
+    protected void loadRuntimeRatios(ArrayList<Integer> distributionRatios) {
+        int position = 0;
+        
+        for (Integer value : distributionRatios) {
+            runtimeRatios.add(new DistributionRatio(position++, value.intValue()));
+        }
+    }
+
+    protected void normalizeDistributionListAgainstProcessors(List<Processor> processors)
{
+        if (processors.size() > getDistributionRatioList().size()) {
+            if (LOG.isWarnEnabled()) {
+                LOG.warn("Listed Load Balance Processors do not match distributionRatio.
Best Effort distribution will be attempted");
+                LOG.warn("Number of Processors: " + processors.size() + ". Number of DistibutionRatioList
elements: " + getDistributionRatioList().size());
+            }
+        } else if (processors.size() < getDistributionRatioList().size()) {
+            if (LOG.isWarnEnabled()) {
+                LOG.warn("Listed Load Balance Processors do not match distributionRatio.
Best Effort distribution will be attempted");
+                LOG.warn("Number of Processors: " + processors.size() + ". Number of DistibutionRatioList
elements: " + getDistributionRatioList().size());
+            }
+            for (int i = processors.size(); i < getDistributionRatioList().size(); i++)
{
+                getDistributionRatioList().set(i, 0);
+                getRuntimeRatios().remove(i);
+            }
+        }        
+    }
+    
+    protected boolean isRuntimeRatiosZeroed() {
+        boolean cleared = true;
+        
+        for (DistributionRatio runtimeRatio : runtimeRatios) {
+            if (runtimeRatio.getRuntimeWeight() > 0) {
+                cleared = false;
+            }
+        }        
+        return cleared; 
+    }
+    
+    protected void resetRuntimeRatios() {
+        for (DistributionRatio runtimeRatio : runtimeRatios) {
+            runtimeRatio.setRuntimeWeight(runtimeRatio.getDistributionWeight());
+        }
+    }
+
+    public ArrayList<Integer> getDistributionRatioList() {
+        return distributionRatioList;
+    }
+
+    public void setDistributionRatioList(ArrayList<Integer> distributionRatioList)
{
+        this.distributionRatioList = distributionRatioList;
+    }
+
+    public ArrayList<DistributionRatio> getRuntimeRatios() {
+        return runtimeRatios;
+    }
+
+    public void setRuntimeRatios(ArrayList<DistributionRatio> runtimeRatios) {
+        this.runtimeRatios = runtimeRatios;
+    }    
+    
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedLoadBalancer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java?rev=1022538&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
Thu Oct 14 13:59:55 2010
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+public class WeightedRandomLoadBalancer extends WeightedLoadBalancer {
+    private int randomCounter;
+    
+    public WeightedRandomLoadBalancer(ArrayList<Integer> distributionRatios) {
+        super(distributionRatios);
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.camel.processor.loadbalancer.QueueLoadBalancer#chooseProcessor(java.util.List,
org.apache.camel.Exchange)
+     */
+    @Override
+    protected Processor chooseProcessor(List<Processor> processors,
+            Exchange exchange) {
+        
+        normalizeDistributionListAgainstProcessors(processors);
+        
+        boolean found = false;
+        
+        while (!found) {
+            if (getRuntimeRatios().isEmpty())  {
+                loadRuntimeRatios(getDistributionRatioList());
+            }
+            
+            randomCounter = 0;
+            if (getRuntimeRatios().size() > 0) {
+                randomCounter = new Random().nextInt(getRuntimeRatios().size());
+            } 
+                
+            if (getRuntimeRatios().get(randomCounter).getRuntimeWeight() > 0) {
+                getRuntimeRatios().get(randomCounter).setRuntimeWeight((getRuntimeRatios().get(randomCounter).getRuntimeWeight())
- 1);
+                found = true;
+                break;
+            } else {
+                getRuntimeRatios().remove(randomCounter);
+            }
+        }
+
+        return processors.get(getRuntimeRatios().get(randomCounter).getProcessorPosition());
+    }
+    
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRoundRobinLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRoundRobinLoadBalancer.java?rev=1022538&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRoundRobinLoadBalancer.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRoundRobinLoadBalancer.java
Thu Oct 14 13:59:55 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+public class WeightedRoundRobinLoadBalancer extends WeightedLoadBalancer {
+    private int counter;
+    
+    public WeightedRoundRobinLoadBalancer(ArrayList<Integer> distributionRatios) {
+        super(distributionRatios);
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.camel.processor.loadbalancer.QueueLoadBalancer#chooseProcessor(java.util.List,
org.apache.camel.Exchange)
+     */
+    @Override
+    protected Processor chooseProcessor(List<Processor> processors,
+            Exchange exchange) {
+        
+        normalizeDistributionListAgainstProcessors(processors);
+            
+        if (isRuntimeRatiosZeroed())  {
+            resetRuntimeRatios();
+            counter = 0;
+        }
+        
+        boolean found = false;
+        
+        while (!found) {
+            if (counter >= getRuntimeRatios().size()) {
+                counter = 0;
+            }
+            
+            if (getRuntimeRatios().get(counter).getRuntimeWeight() > 0) {
+                getRuntimeRatios().get(counter).setRuntimeWeight((getRuntimeRatios().get(counter).getRuntimeWeight())
- 1);
+                found = true;
+                break;
+            } else {
+                counter++;
+            }
+        }
+       
+        return processors.get(counter++);
+    }
+    
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRoundRobinLoadBalancer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRandomLoadBalanceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRandomLoadBalanceTest.java?rev=1022538&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRandomLoadBalanceTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRandomLoadBalanceTest.java
Thu Oct 14 13:59:55 2010
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class WeightedRandomLoadBalanceTest extends ContextTestSupport {
+    protected MockEndpoint x;
+    protected MockEndpoint y;
+    protected MockEndpoint z;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        x = getMockEndpoint("mock:x");
+        y = getMockEndpoint("mock:y");
+        z = getMockEndpoint("mock:z");
+    }
+
+    
+    /* (non-Javadoc)
+     * @see org.apache.camel.ContextTestSupport#isUseRouteBuilder()
+     */
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testRandom() throws Exception {
+
+        x.expectedMessageCount(4);
+        y.expectedMessageCount(2);
+        z.expectedMessageCount(1);
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                ArrayList<Integer> distributionRatio = new ArrayList<Integer>();
+                distributionRatio.add(4);
+                distributionRatio.add(2);
+                distributionRatio.add(1);
+                
+                // START SNIPPET: example
+                from("direct:start").loadBalance().
+                weighted(false, distributionRatio).to("mock:x", "mock:y", "mock:z");
+                // END SNIPPET: example
+            }
+        });
+        context.start();
+        
+        sendMessages(1, 2, 3, 4, 5, 6, 7);
+        
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testRandom2() throws Exception {
+
+        x.expectedMessageCount(2);
+        y.expectedMessageCount(1);
+        z.expectedMessageCount(3);
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                ArrayList<Integer> distributionRatio = new ArrayList<Integer>();
+                distributionRatio.add(2);
+                distributionRatio.add(1);
+                distributionRatio.add(3);
+                
+                // START SNIPPET: example
+                from("direct:start").loadBalance().
+                weighted(false, distributionRatio).to("mock:x", "mock:y", "mock:z");
+                // END SNIPPET: example
+            }
+        });
+        context.start();
+        
+        sendMessages(1, 2, 3, 4, 5, 6);
+        
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testRandomBulk() throws Exception {
+
+        x.expectedMessageCount(10);
+        y.expectedMessageCount(15);
+        z.expectedMessageCount(25);
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                ArrayList<Integer> distributionRatio = new ArrayList<Integer>();
+                distributionRatio.add(2);
+                distributionRatio.add(3);
+                distributionRatio.add(5);
+                
+                // START SNIPPET: example
+                from("direct:start").loadBalance().
+                weighted(false, distributionRatio).to("mock:x", "mock:y", "mock:z");
+                // END SNIPPET: example
+            }
+        });
+        context.start();
+        
+        sendBulkMessages(50);
+        
+        assertMockEndpointsSatisfied();
+    }
+    
+    protected void sendBulkMessages(int number) {
+        for (int i = 0; i < number; i++) {
+            template.sendBodyAndHeader("direct:start", createTestMessage(i), "counter", i);
+        }
+    }
+    
+    protected void sendMessages(int... counters) {
+        for (int counter : counters) {
+            template.sendBodyAndHeader("direct:start", createTestMessage(counter), "counter",
counter);
+        }
+    }
+
+    private String createTestMessage(int counter) {
+        return "<message>" + counter + "</message>";
+    }
+
+    protected Object[] listOfMessages(int... counters) {
+        List<String> list = new ArrayList<String>(counters.length);
+        for (int counter : counters) {
+            list.add(createTestMessage(counter));
+        }
+        return list.toArray();
+    }
+
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRandomLoadBalanceTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRoundRobinLoadBalanceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRoundRobinLoadBalanceTest.java?rev=1022538&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRoundRobinLoadBalanceTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRoundRobinLoadBalanceTest.java
Thu Oct 14 13:59:55 2010
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import static org.apache.camel.component.mock.MockEndpoint.expectsMessageCount;
+
+public class WeightedRoundRobinLoadBalanceTest extends ContextTestSupport {
+    protected MockEndpoint x;
+    protected MockEndpoint y;
+    protected MockEndpoint z;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        x = getMockEndpoint("mock:x");
+        y = getMockEndpoint("mock:y");
+        z = getMockEndpoint("mock:z");
+    }
+
+    
+    /* (non-Javadoc)
+     * @see org.apache.camel.ContextTestSupport#isUseRouteBuilder()
+     */
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testRoundRobin() throws Exception {
+
+        x.expectedMessageCount(5);
+        y.expectedMessageCount(2);
+        z.expectedMessageCount(1);
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                ArrayList<Integer> distributionRatio = new ArrayList<Integer>();
+                distributionRatio.add(4);
+                distributionRatio.add(2);
+                distributionRatio.add(1);
+                
+                // START SNIPPET: example
+                from("direct:start").loadBalance().
+                weighted(true, distributionRatio).to("mock:x", "mock:y", "mock:z");
+                // END SNIPPET: example
+            }
+        });
+        context.start();
+        
+        sendMessages(1, 2, 3, 4, 5, 6, 7, 8);
+        
+        assertMockEndpointsSatisfied();
+        x.expectedBodiesReceived(1, 4, 6, 7, 8);
+        y.expectedBodiesReceived(2, 5);
+        z.expectedBodiesReceived(3);
+    }
+
+    public void testRoundRobin2() throws Exception {
+
+        x.expectedMessageCount(3);
+        y.expectedMessageCount(1);
+        z.expectedMessageCount(3);
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                ArrayList<Integer> distributionRatio = new ArrayList<Integer>();
+                distributionRatio.add(2);
+                distributionRatio.add(1);
+                distributionRatio.add(3);
+                
+                // START SNIPPET: example
+                from("direct:start").loadBalance().
+                weighted(true, distributionRatio).to("mock:x", "mock:y", "mock:z");
+                // END SNIPPET: example
+            }
+        });
+        context.start();
+        
+        sendMessages(1, 2, 3, 4, 5, 6, 7);
+        
+        assertMockEndpointsSatisfied();
+        x.expectedBodiesReceived(1, 4, 7);
+        y.expectedBodiesReceived(2);
+        z.expectedBodiesReceived(3, 5, 6);
+    }
+
+    public void testRoundRobinBulk() throws Exception {
+
+        x.expectedMessageCount(10);
+        y.expectedMessageCount(15);
+        z.expectedMessageCount(25);
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                ArrayList<Integer> distributionRatio = new ArrayList<Integer>();
+                distributionRatio.add(2);
+                distributionRatio.add(3);
+                distributionRatio.add(5);
+                
+                // START SNIPPET: example
+                from("direct:start").loadBalance().
+                weighted(true, distributionRatio).to("mock:x", "mock:y", "mock:z");
+                // END SNIPPET: example
+            }
+        });
+        context.start();
+        
+        sendBulkMessages(50);
+        
+        assertMockEndpointsSatisfied();
+    }
+    
+    protected void sendBulkMessages(int number) {
+        for (int i = 0; i < number; i++) {
+            template.sendBodyAndHeader("direct:start", createTestMessage(i), "counter", i);
+        }
+    }
+    
+    protected void sendMessages(int... counters) {
+        for (int counter : counters) {
+            template.sendBodyAndHeader("direct:start", createTestMessage(counter), "counter",
counter);
+        }
+    }
+
+    private String createTestMessage(int counter) {
+        return "<message>" + counter + "</message>";
+    }
+
+    protected Object[] listOfMessages(int... counters) {
+        List<String> list = new ArrayList<String>(counters.length);
+        for (int counter : counters) {
+            list.add(createTestMessage(counter));
+        }
+        return list.toArray();
+    }
+    
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/WeightedRoundRobinLoadBalanceTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message