camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: camel git commit: Fix for https://issues.apache.org/jira/browse/CAMEL-5113 Parallel and fault tolerant message processing for SQS endpoints
Date Fri, 09 Jan 2015 06:34:01 GMT
Hi

Remember to add @UriParam when you add new options to a component / endpoint.
And also add that to the wiki documentation.

And also remember to remove @author tags from javadoc in the source code.


On Fri, Jan 9, 2015 at 12:31 AM,  <ceposta@apache.org> wrote:
> Repository: camel
> Updated Branches:
>   refs/heads/master 64d08a7e3 -> b80021a15
>
>
> Fix for https://issues.apache.org/jira/browse/CAMEL-5113 Parallel and fault tolerant
message processing for SQS endpoints
>
>
> Project: http://git-wip-us.apache.org/repos/asf/camel/repo
> Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b80021a1
> Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b80021a1
> Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b80021a1
>
> Branch: refs/heads/master
> Commit: b80021a1551213e155c4ec8b1464831e9a6ab1d3
> Parents: 64d08a7
> Author: Christian Posta <christian.posta@gmail.com>
> Authored: Thu Jan 8 16:31:38 2015 -0700
> Committer: Christian Posta <christian.posta@gmail.com>
> Committed: Thu Jan 8 16:31:38 2015 -0700
>
> ----------------------------------------------------------------------
>  .../DefaultScheduledPollConsumerScheduler.java  |  47 ++++++---
>  .../component/aws/sqs/SqsConfiguration.java     |  10 ++
>  .../camel/component/aws/sqs/SqsEndpoint.java    |   4 +
>  .../aws/sqs/SqsConcurrentConsumerTest.java      | 100 +++++++++++++++++++
>  4 files changed, 148 insertions(+), 13 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/b80021a1/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
> index 729ee75..db4e4d1 100644
> --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
> +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
> @@ -16,6 +16,8 @@
>   */
>  package org.apache.camel.impl;
>
> +import java.util.ArrayList;
> +import java.util.List;
>  import java.util.Locale;
>  import java.util.concurrent.ScheduledExecutorService;
>  import java.util.concurrent.ScheduledFuture;
> @@ -38,8 +40,9 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp
>      private Consumer consumer;
>      private ScheduledExecutorService scheduledExecutorService;
>      private boolean shutdownExecutor;
> -    private volatile ScheduledFuture<?> future;
> +    private volatile List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>();
>      private Runnable task;
> +    private int concurrentTasks = 1;
>
>      private long initialDelay = 1000;
>      private long delay = 500;
> @@ -94,6 +97,14 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp
>          this.scheduledExecutorService = scheduledExecutorService;
>      }
>
> +    public int getConcurrentTasks() {
> +        return concurrentTasks;
> +    }
> +
> +    public void setConcurrentTasks(int concurrentTasks) {
> +        this.concurrentTasks = concurrentTasks;
> +    }
> +
>      @Override
>      public void onInit(Consumer consumer) {
>          this.consumer = consumer;
> @@ -106,34 +117,41 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp
>
>      @Override
>      public void unscheduleTask() {
> -        if (future != null) {
> -            future.cancel(false);
> +        if (isSchedulerStarted()) {
> +            for (ScheduledFuture<?> future : futures) {
> +                future.cancel(true);
> +            }
> +            futures.clear();
>          }
>      }
>
>      @Override
>      public void startScheduler() {
>          // only schedule task if we have not already done that
> -        if (future == null) {
> +        if (futures.size() == 0) {
>              if (isUseFixedDelay()) {
>                  if (LOG.isDebugEnabled()) {
>                      LOG.debug("Scheduling poll (fixed delay) with initialDelay: {},
delay: {} ({}) for: {}",
>                              new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH),
consumer.getEndpoint()});
>                  }
> -                future = scheduledExecutorService.scheduleWithFixedDelay(task, getInitialDelay(),
getDelay(), getTimeUnit());
> +                for (int i = 0; i < concurrentTasks; i++) {
> +                    futures.add(scheduledExecutorService.scheduleWithFixedDelay(task,
getInitialDelay(), getDelay(), getTimeUnit()));
> +                }
>              } else {
>                  if (LOG.isDebugEnabled()) {
>                      LOG.debug("Scheduling poll (fixed rate) with initialDelay: {}, delay:
{} ({}) for: {}",
>                              new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH),
consumer.getEndpoint()});
>                  }
> -                future = scheduledExecutorService.scheduleAtFixedRate(task, getInitialDelay(),
getDelay(), getTimeUnit());
> +                for (int i = 0; i < concurrentTasks; i++) {
> +                    futures.add(scheduledExecutorService.scheduleAtFixedRate(task, getInitialDelay(),
getDelay(), getTimeUnit()));
> +                }
>              }
>          }
>      }
>
>      @Override
>      public boolean isSchedulerStarted() {
> -        return future != null;
> +        return futures != null && futures.size() > 0;
>      }
>
>      @Override
> @@ -146,7 +164,7 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp
>          if (scheduledExecutorService == null) {
>              // we only need one thread in the pool to schedule this task
>              this.scheduledExecutorService = getCamelContext().getExecutorServiceManager()
> -                    .newSingleThreadScheduledExecutor(consumer, consumer.getEndpoint().getEndpointUri());
> +                    .newScheduledThreadPool(consumer, consumer.getEndpoint().getEndpointUri(),
concurrentTasks);
>              // and we should shutdown the thread pool when no longer needed
>              this.shutdownExecutor = true;
>          }
> @@ -154,17 +172,20 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp
>
>      @Override
>      protected void doStop() throws Exception {
> -        if (future != null) {
> -            LOG.debug("This consumer is stopping, so cancelling scheduled task: " +
future);
> -            future.cancel(true);
> -            future = null;
> +        if (isSchedulerStarted()) {
> +            LOG.debug("This consumer is stopping, so cancelling scheduled task: " +
futures);
> +            for (ScheduledFuture<?> future : futures) {
> +                future.cancel(true);
> +            }
> +            futures.clear();
>          }
>
>          if (shutdownExecutor && scheduledExecutorService != null) {
>              getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
>              scheduledExecutorService = null;
> -            future = null;
> +            futures.clear();
>          }
>      }
>
> +
>  }
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/b80021a1/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
> ----------------------------------------------------------------------
> diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
> index 07a9ff2..da6f3b4 100644
> --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
> +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
> @@ -58,6 +58,7 @@ public class SqsConfiguration {
>      private Integer defaultVisibilityTimeout;
>      @UriParam(defaultValue = "false")
>      private Boolean extendMessageVisibility = Boolean.FALSE;
> +    private Integer concurrentConsumers = 1;
>
>      // producer properties
>      @UriParam
> @@ -245,6 +246,14 @@ public class SqsConfiguration {
>          this.region = region;
>      }
>
> +    public Integer getConcurrentConsumers() {
> +        return concurrentConsumers;
> +    }
> +
> +    public void setConcurrentConsumers(Integer concurrentConsumers) {
> +        this.concurrentConsumers = concurrentConsumers;
> +    }
> +
>      @Override
>      public String toString() {
>          return "SqsConfiguration[queueName=" + queueName
> @@ -266,6 +275,7 @@ public class SqsConfiguration {
>              + ", redrivePolicy=" + redrivePolicy
>              + ", extendMessageVisibility=" + extendMessageVisibility
>              + ", queueOwnerAWSAccountId=" + queueOwnerAWSAccountId
> +            + ", concurrentConsumers=" + concurrentConsumers
>              + ", region=" + region
>              + "]";
>      }
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/b80021a1/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
> ----------------------------------------------------------------------
> diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
> index d2cc213..4845dd0 100644
> --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
> +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
> @@ -38,6 +38,7 @@ import org.apache.camel.Message;
>  import org.apache.camel.Processor;
>  import org.apache.camel.Producer;
>  import org.apache.camel.impl.DefaultExchange;
> +import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler;
>  import org.apache.camel.impl.ScheduledPollEndpoint;
>  import org.apache.camel.spi.HeaderFilterStrategy;
>  import org.apache.camel.spi.HeaderFilterStrategyAware;
> @@ -89,6 +90,9 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt
>          SqsConsumer sqsConsumer = new SqsConsumer(this, processor);
>          configureConsumer(sqsConsumer);
>          sqsConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
> +        DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler();
> +        scheduler.setConcurrentTasks(configuration.getConcurrentConsumers());
> +        sqsConsumer.setScheduler(scheduler);
>          return sqsConsumer;
>      }
>
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/b80021a1/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConcurrentConsumerTest.java
> ----------------------------------------------------------------------
> diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConcurrentConsumerTest.java
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConcurrentConsumerTest.java
> new file mode 100644
> index 0000000..9de5627
> --- /dev/null
> +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConcurrentConsumerTest.java
> @@ -0,0 +1,100 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.component.aws.sqs;
> +
> +import java.util.HashSet;
> +import java.util.Set;
> +import java.util.concurrent.TimeUnit;
> +
> +import com.amazonaws.services.sqs.model.Message;
> +import org.apache.camel.Exchange;
> +import org.apache.camel.Processor;
> +import org.apache.camel.builder.NotifyBuilder;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.impl.JndiRegistry;
> +import org.apache.camel.test.junit4.CamelTestSupport;
> +import org.junit.Test;
> +
> +
> +
> +/**
> + * Created by ceposta
> + * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
> + */
> +public class SqsConcurrentConsumerTest extends CamelTestSupport {
> +    private static final int NUM_CONCURRENT = 10;
> +    private static final int NUM_MESSAGES = 100;
> +
> +    final Set<Long> threadNumbers = new HashSet<Long>();
> +
> +    @Test
> +    public void consumeMessagesFromQueue() throws Exception {
> +        NotifyBuilder notifier = new NotifyBuilder(context).whenCompleted(NUM_MESSAGES).create();
> +        assertTrue("We didn't process "
> +                + NUM_MESSAGES
> +                + " messages as we expected!", notifier.matches(5, TimeUnit.SECONDS));
> +
> +
> +
> +        // simple test to make sure all N concurrent consumers were used in the test
> +        if (threadNumbers.size() != NUM_CONCURRENT) {
> +            fail(String.format("We were expecting to have %d numbers of concurrent consumers,
but only found %d",
> +                    NUM_CONCURRENT, threadNumbers.size()));
> +        }
> +
> +
> +    }
> +
> +    @Override
> +    protected JndiRegistry createRegistry() throws Exception {
> +        JndiRegistry reg = super.createRegistry();
> +        AmazonSQSClientMock client = new AmazonSQSClientMock();
> +        createDummyMessages(client, NUM_MESSAGES);
> +        reg.bind("client", client);
> +        return reg;
> +    }
> +
> +    private void createDummyMessages(AmazonSQSClientMock client, int numMessages) {
> +        for (int counter = 0; counter < numMessages; counter++) {
> +            Message message = new Message();
> +            message.setBody("Message " + counter);
> +            message.setMD5OfBody("6a1559560f67c5e7a7d5d838bf0272ee");
> +            message.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458");
> +            message.setReceiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5");
> +            client.messages.add(message);
> +        }
> +    }
> +
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() throws Exception {
> +                from("aws-sqs://demo?concurrentConsumers=" + NUM_CONCURRENT + "&maxMessagesPerPoll=10&amazonSQSClient=#client")
> +                        .process(new Processor() {
> +                            @Override
> +                            public void process(Exchange exchange) throws Exception
{
> +                                threadNumbers.add(Thread.currentThread().getId());
> +                            }
> +                        }).log("processed a new message!");
> +            }
> +        };
> +    }
> +
> +
> +}
>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Mime
View raw message