camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: a Flow Relgulator Component
Date Tue, 13 Apr 2010 17:40:51 GMT
Hi

Thanks for your contribution. And the idea of using DelayedQueue is good.
Its something we will add to Camel error handler in the future to
offer non blocked waiting while waiting to do next redelivery.

In terms of dynamically throttle a route I believe Camel's RoutePolicy
is maybe more powerful
http://camel.apache.org/routepolicy.html

It allows you to avoid intaking the messages which then wont risk of
loosing messages if they are temporary stored in an in memory
DelayedQueue.



On Tue, Apr 13, 2010 at 3:45 PM, blau <bernard@laurant.fr> wrote:
>
>
> Hello, this is a little contribution to camel projet.
>
> Sometime you need to regulate (slow down) the flow of a stream.
> For example, imagine a marketdata flow where body of message is a
> Map<String, Object>. Update rate is far too important for your need
> (sometime more thant 20 mess/sec), so you want to regulate it at 1mess/sec.
>
> As messages are not delayed but aggregated, this flowregulator need a
> MessageAggregator strategy (implementation given by a factory).
>
> This implementation is based on the DelayQueue from java.util.concurrent.
>
> ex: from("timer://test?period=1000").to("flowregulator://test?period=5000");
>
> /**
>  * This camel component is able to regulate the flow of a route.
>  * <p>FlowRegulatorComponent uses a period parameter to trigger updates.
>  * <p>First input message is send immediately to the output.
>  * Others messages (if period is not over) are inserted into a <em>time
> pipeline</em> implemented with a DelayQueue, waiting for timeout to be send
> to output
>  * <p>If period is over, an incoming message will be send immediately to
> output.
>  *
>  * As messages are not delayed but aggregated, this flowregulator need a
> MessageAggregator implementation given by a factory.
>  *
>  * <p>example:
>  * <code>
>  * <p>camelContext.addComponent("flowregulator", new
> FlowRelgulatorComponent(new StringMessageAggregatorFactory()));
>  *
> <p>from("timer://test?period=1000").to("flowregulator://test?period=5000");
>  * <p>from("flowregulator://test?period=5000").to...
>  * </code>
>  * @author bernard LAURANT
>  */
> public class FlowRelgulatorComponent extends DefaultComponent {
>
>        /**
>         * the time pipeline
>         */
>        private DelayQueue<FlowRelgulatorEndPoint> flowRelgulatorEndPoints =
new
> DelayQueue<FlowRelgulatorEndPoint>();
>
>        /**
>         * default period is set to 1s
>         */
>        private long defaultPeriod = 1000;
>
>        private MessageAggregatorFactory messageAggregatorFactory;
>
>        public FlowRelgulatorComponent() {
>                super();
>        }
>
>        public FlowRelgulatorComponent(CamelContext camelContext) {
>                super(camelContext);
>        }
>
>        public FlowRelgulatorComponent(MessageAggregatorFactory
> messageAggregatorFactory) {
>                super();
>                this.messageAggregatorFactory = messageAggregatorFactory;
>        }
>
>        public FlowRelgulatorComponent(CamelContext camelContext,
> MessageAggregatorFactory messageAggregatorFactory) {
>                super(camelContext);
>                this.messageAggregatorFactory = messageAggregatorFactory;
>        }
>
>        @SuppressWarnings("unchecked")
>        @Override
>    protected Endpoint createEndpoint(String uri, String remaining, Map
> parameters) throws Exception {
>                Long period = (Long)getAndRemoveParameter(parameters, "period",
> Long.class);
>                if (period == null) {
>                        period = defaultPeriod;
>                }
>                FlowRelgulatorEndPoint flowRelgulatorEndPoint = new
> FlowRelgulatorEndPoint(uri, this, remaining);
>                flowRelgulatorEndPoint.setPeriod(period);
>
> flowRelgulatorEndPoint.setAggregator(messageAggregatorFactory.createMessageAggregator());
>                return flowRelgulatorEndPoint;
>    }
>
>        public void setDefaultPeriod(long defaultPeriod) {
>                this.defaultPeriod = defaultPeriod;
>        }
>
>        public void pipeIn(FlowRelgulatorEndPoint flowRelgulatorEndPoint) {
>                synchronized (flowRelgulatorEndPoints) {
>                        if (!flowRelgulatorEndPoints.contains(flowRelgulatorEndPoint))
{
>                                flowRelgulatorEndPoints.put(flowRelgulatorEndPoint);
>                        }
>                }
>        }
>
>        @Override
>        public void start() throws Exception {
>                super.start();
>                Thread pipeOut = new Thread(new Runnable() {
>                        @Override
>                        public void run() {
>                                while (true) {
>                                        try {
>                                                FlowRelgulatorEndPoint
flowRelgulatorEndPoint =
> flowRelgulatorEndPoints.poll(200, TimeUnit.MILLISECONDS);
>                                                if (flowRelgulatorEndPoint
!= null) {
>                                                        flowRelgulatorEndPoint.flush();
>                                                }
>                                        } catch (InterruptedException
e) {
>                                        }
>                                }
>                        }
>                });
>                pipeOut.start();
>        }
>
>        public void setMessageAggregatorFactory(MessageAggregatorFactory
> messageAggregatorFactory) {
>                this.messageAggregatorFactory = messageAggregatorFactory;
>        }
> }
>
>
>
>
> /**
>  * Endpoint for FlowRegulator.
>  *
>  * @see FlowRegulatorComponent
>  * @author bernard LAURANT
>  */
> public class FlowRelgulatorEndPoint extends DefaultEndpoint implements
> Delayed {
>
>        private MessageAggregator messageAggregator;
>        private ReguledConsumer reguledConsumer;
>
>        private String remaining;
>        private long period;
>        private long timeOut;
>        private long lastMessSent;
>
>        public FlowRelgulatorEndPoint(String uri, FlowRelgulatorComponent
> flowRelgulatorComponent, String remaining) {
>                super(uri, flowRelgulatorComponent);
>                this.remaining = remaining;
>        }
>
>        @Override
>        public String toString() {
>                return createEndpointUri();
>        }
>
>        @Override
>        protected String createEndpointUri() {
>                return new
> StringBuilder().append("flowregulator://").append(remaining).append("?period=").append(period).toString();
>        }
>
>        @Override
>        public long getDelay(TimeUnit unit) {
>                long millis2PipeOut = -(System.currentTimeMillis() - timeOut);
>                return unit.convert(millis2PipeOut, TimeUnit.MILLISECONDS);
>        }
>
>        @Override
>        public int compareTo(Delayed o) {
>                long thisDelay = getDelay(TimeUnit.MILLISECONDS);
>                long anotherDelay = o.getDelay(TimeUnit.MILLISECONDS);
>                return (thisDelay<anotherDelay ? -1 : (thisDelay==anotherDelay
? 0 : 1));
>        }
>
>        @Override
>        public int hashCode() {
>                final int prime = 31;
>                int result = super.hashCode();
>                result = prime * result + (int) (period ^ (period >>>
32));
>                result = prime * result + ((remaining == null) ? 0 :
> remaining.hashCode());
>                return result;
>        }
>
>        @Override
>        public boolean equals(Object obj) {
>                if (this == obj)
>                        return true;
>                if (!super.equals(obj))
>                        return false;
>                if (getClass() != obj.getClass())
>                        return false;
>                FlowRelgulatorEndPoint other = (FlowRelgulatorEndPoint) obj;
>                if (remaining == null) {
>                        if (other.remaining != null)
>                                return false;
>                } else if (!remaining.equals(other.remaining))
>                        return false;
>                if (period != other.period)
>                        return false;
>                return true;
>        }
>
>
>        @Override
>        public Producer createProducer() throws Exception {
>                return new DefaultProducer(this) {
>                        @Override
>                        public void process(Exchange exchange) throws Exception
{
>                                messageAggregator.agregateMessage(exchange.getIn());
>                                if (lastMessSent == 0) {
>                                        flush();// first msg: send
it immediately
>                                } else {
>                                        timeOut = lastMessSent + period;
// timeout to pipeout the msg
>                                        if (System.currentTimeMillis()
>= timeOut) {
>                                                flush(); // send
it immediately, because timeout has expired
>                                        } else {
>                                                // put it in the
time pipeline
>
> ((FlowRelgulatorComponent)FlowRelgulatorEndPoint.this.getComponent()).pipeIn(FlowRelgulatorEndPoint.this);
>                                        }
>                                }
>                        }
>                };
>        }
>
>        @Override
>        public Consumer createConsumer(Processor processor) throws Exception {
>                if (reguledConsumer == null) {
>                        reguledConsumer = new ReguledConsumer(this, processor);
>                }
>                return reguledConsumer;
>        }
>
>        public void flush() {
>                reguledConsumer.send();
>                lastMessSent = System.currentTimeMillis();
>        }
>
>        class ReguledConsumer extends DefaultConsumer {
>
>                public ReguledConsumer(Endpoint endPoint, Processor processor)
{
>                        super(endPoint, processor);
>                }
>
>                public void send() {
>                        Exchange exchange = getEndpoint().createExchange();
>                        exchange.setIn(messageAggregator.getAndClearMessage());
>                        try {
>                                getProcessor().process(exchange);
>                        } catch (Exception e) {
>                                handleException(e);
>                        }
>                }
>        }
>
>        @Override
>        public boolean isSingleton() {
>                return true;
>        }
>
>        public void setPeriod(long period) {
>                this.period = period;
>        }
>
>        public void setAggregator(MessageAggregator messageAggregator) {
>                this.messageAggregator = messageAggregator;
>        }
> }
>
>
> The MessageAgregator interface :
>
> public interface MessageAggregatorFactory {
>
>        MessageAggregator createMessageAggregator();
> }
>
> public interface MessageAggregator {
>        void agregateMessage(Message message);
>        void clearMessage();
>        Message getMessage();
>        Message getAndClearMessage();
> }
>
> Exemple for a Map :
>
> public class MapMessageAggregatorFactory implements MessageAggregatorFactory
> {
>
>        @Override
>        public MessageAggregator createMessageAggregator() {
>                return new MessageAggregator() {
>
>                        Message data;
>
>                        @Override
>                        public synchronized Message getMessage() {
>                                return data;
>                        }
>
>                        @Override
>                        public synchronized void clearMessage() {
>                                data = null;
>                        }
>
>                        @Override
>                        public synchronized Message getAndClearMessage() {
>                                Message res = getMessage();
>                                clearMessage();
>                                return res;
>                        }
>
>                        @SuppressWarnings("unchecked")
>                        @Override
>                        public synchronized void agregateMessage(Message message)
{
>                                if (message == null)
>                                        return;
>                                if (data == null) {
>                                        data = message;
>                                } else {
>                                        Map newData = (Map) message.getBody();
>                                        ((Map)data.getBody()).putAll(newData);
>                                }
>                        }
>                };
>        }
> }
>
> exemple for a string :
>
> public class StringMessageAggregatorFactory implements
> MessageAggregatorFactory {
>
>        @Override
>        public MessageAggregator createMessageAggregator() {
>                return new MessageAggregator() {
>
>                        Message data;
>
>                        @Override
>                        public synchronized Message getMessage() {
>                                return data;
>                        }
>
>                        @Override
>                        public synchronized void clearMessage() {
>                                data.setBody("");
>                        }
>
>                        @Override
>                        public synchronized void agregateMessage(Message message)
{
>                                if (data == null) {
>                                        data = message;
>                                } else {
>                                        data.setBody((String)data.getBody()
+ message.getBody());
>                                        data.getHeaders().putAll(message.getHeaders());
>                                }
>                        }
>
>                        @Override
>                        public synchronized Message getAndClearMessage() {
>                                Message res = data;
>                                data = null;
>                                return res;
>                        }
>                };
>        }
> }
>
> And a simple test :
>
> public class TestFlow {
>
>        static CamelContext camelContext;
>        static int counter = 0;
>
>        public static void main(String[] args) throws Exception {
>                camelContext = new DefaultCamelContext();
>                camelContext.addComponent("flowregulator", new FlowRelgulatorComponent(new
> StringMessageAggregatorFactory()));
>
>                camelContext.addRoutes(new RouteBuilder() {
>                        @Override
>                        public void configure() throws Exception {
>                                from("timer://test?period=1000")
>                                .process(new Processor() {
>                                        @Override
>                                        public void process(Exchange
exchange) throws Exception {
>                                                System.out.println("timer
: " + exchange.getIn().getHeaders());
>                                                Message msg =
exchange.getIn();
>                                                msg.setBody(Integer.toString(counter++));
>                                                exchange.setOut(msg);
>                                        }
>                                })
>                                .to("flowregulator://test?period=5000");
>
>                                from("flowregulator://test?period=5000").process(new
Processor() {
>                                        @Override
>                                        public void process(Exchange
exchange) throws Exception {
>                                                System.out.println("flowReg:
" + exchange.getIn() + " " +
> exchange.getIn().getHeaders());
>                                        }
>                                });
>                        }
>                });
>                camelContext.start();
>        }
>
> }
>
>
>
> --
> View this message in context: http://old.nabble.com/a-Flow-Relgulator-Component-tp28230406p28230406.html
> Sent from the Camel Development mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Mime
View raw message