activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r532654 - in /activemq/camel/trunk/camel-core/src/main/java/org/apache/camel: ./ component/processor/ impl/ processor/loadbalancer/
Date Thu, 26 Apr 2007 08:24:07 GMT
Author: jstrachan
Date: Thu Apr 26 01:24:06 2007
New Revision: 532654

URL: http://svn.apache.org/viewvc?view=rev&rev=532654
Log:
added some load balancer helper classes together with the basics of a processor component
for turning a Processor into an Endpoint (so to turn say a transformer into an endpoint)

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpointConsumer.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/package.html
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RandomLoadBalancer.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RoundRobinLoadBalancer.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/package.html
  (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?view=diff&rev=532654&r1=532653&r2=532654
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Thu Apr
26 01:24:06 2007
@@ -129,4 +129,24 @@
      * Returns the injector used to instantiate objects by type
      */
     Injector getInjector();
+
+    /**
+     * Adds the endpoint to the context using the given URI
+     *
+     * @param uri the URI to be used to resolve this endpoint
+     * @param endpoint the endpoint to be added to the context
+     * @return the old endpoint that was previously registered to the context if there was
+     * already an endpoint for that URI
+     * @throws Exception if the new endpoint could not be started or the old endpoint could
not be stopped
+     */
+    Endpoint addEndpoint(String uri, Endpoint endpoint) throws Exception;
+
+    /**
+     * Removes the endpoint with the given URI
+     *
+     * @param uri the URI to be used to remove
+     * @return the endpoint that was removed or null if there is no endpoint for this URI
+     * @throws Exception if endpoint could not be stopped
+     */
+    Endpoint removeEndpoint(String uri) throws Exception;
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java?view=auto&rev=532654
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.processor;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Consumer;
+import org.apache.camel.Producer;
+import org.apache.camel.Component;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.processor.loadbalancer.LoadBalancer;
+
+/**
+ * A base class for creating {@link Endpoint} implementations from a {@link Processor}
+ *
+ * @version $Revision: 1.1 $
+ */
+public class ProcessorEndpoint extends DefaultEndpoint<Exchange> {
+    private final Processor<Exchange> processor;
+    private final LoadBalancer<Exchange> loadBalancer;
+
+    protected ProcessorEndpoint(String endpointUri, Component component, Processor<Exchange>
processor, LoadBalancer<Exchange> loadBalancer) {
+        super(endpointUri, component);
+        this.processor = processor;
+        this.loadBalancer = loadBalancer;
+    }
+
+    public Exchange createExchange() {
+        return new DefaultExchange(getContext());
+    }
+
+    public Producer<Exchange> createProducer() throws Exception {
+        return startService(new DefaultProducer<Exchange>(this) {
+            public void process(Exchange exchange) {
+                onExchange(exchange);
+            }
+        });
+    }
+
+    public Consumer<Exchange> createConsumer(Processor<Exchange> processor) throws
Exception {
+        return startService(new ProcessorEndpointConsumer(this, processor));
+    }
+
+    public Processor<Exchange> getProcessor() {
+        return processor;
+    }
+
+    public LoadBalancer<Exchange> getLoadBalancer() {
+        return loadBalancer;
+    }
+
+    protected void onExchange(Exchange exchange) {
+        processor.process(exchange);
+
+        // now lets output to the load balancer
+        loadBalancer.process(exchange);
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpointConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpointConsumer.java?view=auto&rev=532654
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpointConsumer.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpointConsumer.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+
+/**
+ * @version $Revision: 1.1 $
+*/
+public class ProcessorEndpointConsumer extends DefaultConsumer<Exchange> {
+    private final ProcessorEndpoint endpoint;
+
+    public ProcessorEndpointConsumer(ProcessorEndpoint endpoint, Processor<Exchange>
processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        endpoint.getLoadBalancer().addProcessor(getProcessor());
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        endpoint.getLoadBalancer().removeProcessor(getProcessor());
+        super.doStop();
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpointConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/package.html
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/package.html?view=auto&rev=532654
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/package.html
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/package.html
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,26 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+A component to make it easy to turn a <a href="http://activemq.apache.org/camel/processor.html">Processor</a>
into a fully fledged
+<a href="http://activemq.apache.org/camel/endpoint.html">Endpoint</a>
+
+</body>
+</html>

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?view=diff&rev=532654&r1=532653&r2=532654
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
Thu Apr 26 01:24:06 2007
@@ -17,6 +17,8 @@
  */
 package org.apache.camel.impl;
 
+import static org.apache.camel.util.ServiceHelper.stopServices;
+import static org.apache.camel.util.ServiceHelper.startServices;
 import org.apache.camel.*;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.converter.DefaultTypeConverter;
@@ -78,7 +80,7 @@
 					if( isStarted() ) {
 						// If the component is looked up after the context is started,
 						// lets start it up.
-						ServiceHelper.startServices(component);
+						startServices(component);
 					}
 				} catch (Exception e) {
 					throw new RuntimeCamelException("Could not auto create component: "+name, e);
@@ -137,6 +139,26 @@
         }
     }
 
+    public Endpoint addEndpoint(String uri, Endpoint endpoint) throws Exception {
+        Endpoint oldEndpoint;
+        synchronized (endpoints) {
+            startServices(endpoint);
+            oldEndpoint = endpoints.remove(uri);
+            endpoints.put(uri, endpoint);
+            stopServices(oldEndpoint);
+        }
+        return oldEndpoint;
+    }
+
+    public Endpoint removeEndpoint(String uri) throws Exception {
+        Endpoint oldEndpoint;
+        synchronized (endpoints) {
+            oldEndpoint = endpoints.remove(uri);
+            stopServices(oldEndpoint);
+        }
+        return oldEndpoint;
+    }
+
     /**
      * Resolves the given URI to an endpoint
      */
@@ -164,7 +186,7 @@
                     // HC: What's the idea behind starting an endpoint?
                     // I don't think we have any endpoints that are services do we?
                     if (answer != null) {
-                        ServiceHelper.startServices(answer);
+                        startServices(answer);
                         endpoints.put(uri, answer);
                     }
                 }
@@ -280,7 +302,7 @@
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(servicesToClose);
+        stopServices(servicesToClose);
     }
 
     /**

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java?view=auto&rev=532654
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+/**
+ * A strategy for load balancing across a number of {@link Processor} instances
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface LoadBalancer<E extends Exchange> extends Processor<E> {
+    /**
+     * Adds a new processor to the load balancer
+     *
+     * @param processor the processor to be added to the load balancer
+     */
+    void addProcessor(Processor<E> processor);
+
+    /**
+     * Removes the given processor from the load balancer
+     *
+     * @param processor the processor to be removed from the load balancer
+     */
+    void removeProcessor(Processor<E> processor);
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java?view=auto&rev=532654
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * A default base class for a {@link LoadBalancer} implementation
+ *
+ * @version $Revision: 1.1 $
+ */
+public abstract class LoadBalancerSupport<E extends Exchange> implements LoadBalancer<E>
{
+    private List<Processor<E>> processors = new CopyOnWriteArrayList<Processor<E>>();
+
+    public void addProcessor(Processor<E> processor) {
+        processors.add(processor);
+    }
+
+    public void removeProcessor(Processor<E> processor) {
+        processors.remove(processor);
+    }
+
+    /**
+     * Returns the current processors available to this load balancer
+     *
+     * @return the processors available
+     */
+    public List<Processor<E>> getProcessors() {
+        return processors;
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java?view=auto&rev=532654
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import java.util.List;
+
+/**
+ * A base class for {@link LoadBalancer} implementations which choose a single destination
for each exchange
+ * (rather like JMS Queues)
+ *
+ * @version $Revision: 1.1 $
+ */
+public abstract class QueueLoadBalancer<E extends Exchange> extends LoadBalancerSupport<E>
{
+
+    public void process(E exchange) {
+        List<Processor<E>> list = getProcessors();
+        if (list.isEmpty()) {
+            throw new IllegalStateException("No processors available to process " + exchange);
+        }
+        Processor<E> processor = chooseProcessor(list, exchange);
+        if (processor == null) {
+            throw new IllegalStateException("No processors could be chosen to process " +
exchange);
+        }
+        else {
+            processor.process(exchange);
+        }
+    }
+
+    protected abstract Processor<E> chooseProcessor(List<Processor<E>>
processors, E exchange);
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RandomLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RandomLoadBalancer.java?view=auto&rev=532654
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RandomLoadBalancer.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RandomLoadBalancer.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import java.util.List;
+
+/**
+ * Implements the random load balancing policy
+ *
+ * @version $Revision: 1.1 $
+ */
+public class RandomLoadBalancer<E extends Exchange> extends QueueLoadBalancer<E>
{
+
+    protected synchronized Processor<E> chooseProcessor(List<Processor<E>>
processors, E exchange) {
+        int size = processors.size();
+        while (true) {
+            int index = (int) Math.round(Math.random() * size);
+            if (index < size) {
+                return processors.get(index);
+            }
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RandomLoadBalancer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RoundRobinLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RoundRobinLoadBalancer.java?view=auto&rev=532654
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RoundRobinLoadBalancer.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RoundRobinLoadBalancer.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import java.util.List;
+
+/**
+ * Implements the round robin load balancing policy
+ *
+ * @version $Revision: 1.1 $
+ */
+public class RoundRobinLoadBalancer<E extends Exchange> extends QueueLoadBalancer<E>
{
+    private int counter = -1;
+
+    protected synchronized Processor<E> chooseProcessor(List<Processor<E>>
processors, E exchange) {
+        int size = processors.size();
+        if (++counter >= size) {
+            counter = 0;
+        }
+        return processors.get(counter);
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RoundRobinLoadBalancer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java?view=auto&rev=532654
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Processor;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
+
+/**
+ * Implements a sticky load balancer using an {@link Expression} to calculate
+ * a correlation key to perform the sticky load balancing; rather like jsessionid in the
web
+ * or JMSXGroupID in JMS.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class StickyLoadBalancer<E extends Exchange> extends QueueLoadBalancer<E>
{
+    private Expression<E> correlationExpression;
+    private QueueLoadBalancer loadBalancer;
+    private int numberOfHashGroups = 64 * 1024;
+    private Map<Object, Processor<E>> stickyMap = new HashMap<Object, Processor<E>>();
+
+    public StickyLoadBalancer(Expression<E> correlationExpression) {
+        this(correlationExpression, new RoundRobinLoadBalancer());
+    }
+
+    public StickyLoadBalancer(Expression<E> correlationExpression, QueueLoadBalancer
loadBalancer) {
+        this.correlationExpression = correlationExpression;
+        this.loadBalancer = loadBalancer;
+    }
+
+    protected synchronized Processor<E> chooseProcessor(List<Processor<E>>
processors, E exchange) {
+        Object value = correlationExpression.evaluate(exchange);
+        Object key = getStickyKey(value);
+
+        Processor<E> processor;
+        synchronized (stickyMap) {
+            processor = stickyMap.get(key);
+            if (processor == null) {
+                processor = loadBalancer.chooseProcessor(processors, exchange);
+                stickyMap.put(key, processor);
+            }
+        }
+        return processor;
+    }
+
+    @Override
+    public void removeProcessor(Processor<E> processor) {
+        synchronized (stickyMap) {
+            Iterator<Map.Entry<Object,Processor<E>>> iter = stickyMap.entrySet().iterator();
+            while (iter.hasNext()) {
+                Map.Entry<Object, Processor<E>> entry = iter.next();
+                if (processor.equals(entry.getValue())) {
+                    iter.remove();
+                }
+            }
+        }
+        super.removeProcessor(processor);
+    }
+
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public int getNumberOfHashGroups() {
+        return numberOfHashGroups;
+    }
+
+    public void setNumberOfHashGroups(int numberOfHashGroups) {
+        this.numberOfHashGroups = numberOfHashGroups;
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+
+    /**
+     * A strategy to create the key for the sticky load balancing map.
+     * The default implementation uses the hash code of the value
+     * then modulos by the numberOfHashGroups to avoid the sticky map getting too big
+     *
+     * @param value the correlation value
+     * @return the key to be used in the sticky map
+     */
+    protected Object getStickyKey(Object value) {
+        int hashCode = 37;
+        if (value != null) {
+            hashCode = value.hashCode();
+        }
+        if (numberOfHashGroups > 0) {
+            hashCode = hashCode % numberOfHashGroups;
+        }
+        return hashCode;
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java?view=auto&rev=532654
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.Pipeline;
+
+import java.util.List;
+
+/**
+ * A {@link LoadBalancer} implementations which sends to all destinations (rather like JMS
Topics)
+ *
+ * @version $Revision: 1.1 $
+ */
+public class TopicLoadBalancer<E extends Exchange> extends LoadBalancerSupport<E>
{
+    public void process(E exchange) {
+        List<Processor<E>> list = getProcessors();
+        for (Processor<E> processor : list) {
+            E copy = copyExchangeStrategy(processor, exchange);
+            processor.process(copy);
+        }
+    }
+
+    /**
+     * Strategy method to copy the exchange before sending to another endpoint. Derived classes
such as the
+     * {@link Pipeline} will not clone the exchange
+     *
+     * @param processor the processor that will send the exchange
+     * @param exchange @return the current exchange if no copying is required such as for
a pipeline otherwise a new copy of the exchange is returned.
+     */
+    protected E copyExchangeStrategy(Processor<E> processor, E exchange) {
+        return (E) exchange.copy();
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/package.html
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/package.html?view=auto&rev=532654
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/package.html
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/package.html
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Various load balancer processors
+
+</body>
+</html>

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/package.html
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message