camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roman Vottner <r...@gmx.at>
Subject Testing rainy-cases of consumers consuming in an own thread
Date Thu, 29 Sep 2016 13:03:01 GMT
We have a custom Camel component named CamelRedisSeda which basically is an extension of UriEndpointComponent,
which pops items from or pushes items to a Redis queue. For testing both Redis client and
the connection are mocked and use a blocking queue in the back as Redis queue replacement.

I test the consumer of the component using the ConsumerTemplate like this:

@Test
public void testRedisSedaConsumerForMultipleAvailableMessages() throws Exception {

  String queueName = "redis-seda:q1";
  RedisConnection<String, RedisSedaPayload> con =
      redisClient.connect(new ObjectCodec<RedisSedaPayload>());
  con.rpush(queueName,
            new RedisSedaPayload("test"),
            new RedisSedaPayload("test2"),
            new RedisSedaPayload("test3"));

  ConsumerTemplate consumerTemplate = consumer();
  String body = consumerTemplate.receiveBody("redis-seda://q1?redisClient=#redisClient", 500,
String.class);
  String body2 = consumerTemplate.receiveBody("redis-seda://q1?redisClient=#redisClient",
500, String.class);
  String body3 = consumerTemplate.receiveBody("redis-seda://q1?redisClient=#redisClient",
500, String.class);
  String body4 = consumerTemplate.receiveBody("redis-seda://q1?redisClient=#redisClient",
500, String.class);

  assertThat(body, is(equalTo("test")));
  assertThat(body2, is(equalTo("test2")));
  assertThat(body3, is(equalTo("test3")));
  assertThat(body4, is(nullValue()));
}

which works quite well. Though, we noticed while patching one of our servers recently, which
also required a restart, that one of our messages got lost during the restart. To test this
behavior I wanted to create a test case which fills the mocked queue with plenty of messages
and shut down the camel context while the consumer pops messages from the queue.

Basically the test first creates 1000 entries which the consumer should create. I then create
a worker thread which stops the context (and the consumer) after the 5th message was processed.
Each pop is currently mocked with a slight sleep delay so that the whole tests needs a couple
of seconds and the shutdown thread has a chance to perform its task.

  @Test
  public void testCamelShutdownWhileConsumingMessages() throws Exception {
    int maxItems = 1000;
    String queueName = "redis-seda:q1";
    RedisConnection<String, RedisSedaPayload> con =
        redisClient.connect(new ObjectCodec<RedisSedaPayload>());
    for (int i=1; i<maxItems+1; i++) {
      con.rpush(queueName, new RedisSedaPayload(i));
    }

    ConsumerTemplate consumerTemplate = consumer();

    CyclicBarrier barrier = new CyclicBarrier(2);
    camelShutdownThread(barrier).start();

    Exchange exchange = null;
    int lastProcessedItem = 0;
    do {
      try {
        if (5 == lastProcessedItem) {
          LOG.debug("Consumer test waiting for synchronization point");
          barrier.await();
        }
        exchange =
            consumerTemplate.receive("redis-seda://q1?redisClient=#redisClient", 50);
        if (null != exchange) {
          lastProcessedItem = exchange.getIn().getBody(Integer.class);
        }
      } catch (Exception ex) {
        LOG.warn("Caught exception", ex);
        // extract the item number from the last processed exchange body
        if (null != exchange) {
          lastProcessedItem = exchange.getIn().getBody(Integer.class);
        }
        exchange = null;
      }
    } while (null != exchange);

    LOG.info("Consumed items: {}, available items: {}", lastProcessedItem, con.llen(""));
    assertThat(lastProcessedItem + con.llen(""), is(equalTo((long)maxItems)));
  }

The shutdown thread invoked from within the test does look like this

private Thread camelShutdownThread(final CyclicBarrier barrier) throws Exception {
  return new Thread(new Runnable() {
    @Override
    public void run() {
      try {
        LOG.debug("Shutdown thread waiting for synchronization point");
        barrier.await();
        LOG.info("Performing shutdown request");
      } catch (BrokenBarrierException |InterruptedException ex) {
        LOG.warn("Caught exception " + ex.getLocalizedMessage(), ex);
      }
      try {
        shutdownCamel();
      } catch (Exception ex) {
        LOG.warn("Caught exception " + ex.getLocalizedMessage(), ex);
      }
    }
  });
}

After consuming the 5th element and returning its exchange through the consumer template the
barrier is invoked and both threads proceed. While the test will continue to receive exchanges
(or the body of it), the shutdown thread attempts to shutdown the camel context in order to
mimic the behavior we noticed on live.

However, for some reason, even if ServiceSupport internally uses AtomicBoolean to synchronize
the internal state of the service among threads, the consumer thread keeps consuming messages
from the queue. The stop method of the consumer is invoked only after the test is torn down
and I’m somehow unable to stop the consumer (or the context) prematurely and therefore fake
the behavior of an application shutdown.

Our desired behavior is that any available exchange should finish gracefully, while the component
stops on consuming further messages from the queue to avoid data loss. The actual consumer
does look like this

@Override
protected void doStart() throws Exception {
  executor = endpoint.getCamelContext().getExecutorServiceManager()
      .newSingleThreadExecutor(this, endpoint.getEndpointUri());
  executor.execute(this);
  super.doStart();
}

@Override
protected void doStop() throws Exception {
  LOG.info("Stop signal received for queue {}", queueName);
  super.stop();
  if (executor != null) {
    endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
    executor.shutdown(); // does prevent further task from being added
    executor = null;
  }
  super.doStop();
}

@Override
public void run() {
  if (LOG.isTraceEnabled()) {
    LOG.trace("Starting consumer for queue {}", endpoint.getConfiguration().getQueueName());
  }

  while (isRunAllowed()) {
    …
    KeyValue<String, RedisSedaPayload> popped =
    con.blpop(endpoint.getConfiguration().getPollTimeout(), queueName);
    if (popped != null) {
      RedisSedaPayload redisPayload = popped.value;
      exchange = this.getEndpoint().createExchange();
      if (LOG.isDebugEnabled()) {
        LOG.debug("Consumed message with jobId {}", redisPayload.getId());
      }
      final Object body = redisPayload.getPayload();
      if (body instanceof DefaultExchangeHolder) {
        DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) body);
      } else {
        exchange.getIn().setBody(body);
      }
      try {
        // process using the asynchronous routing engine
        getAsyncProcessor().process(exchange, new AsyncCallback() {
          public void done(boolean asyncDone) {
            // noop
          }
        });

        if (exchange.getException() != null) {
          getExceptionHandler()
              .handleException("Error processing exchange", exchange,
                               exchange.getException());
        }
      } catch (Exception e) {
        LOG.error(CamelExchangeException.createExceptionMessage(
            "Exception in Consumer upon when trying to process exchange", exchange, e));
      }
    }
   // further exception handling omitted
  }
  LOG.debug("Consuming from redis queue {} finished", queueName);
}

I’m aware that the data loss was caused between the blocking pop and the forward to the
AsyncProcessor as the message was consumed from the queue but the exchange was not handled
by Camel anymore. We might push the message in that case to the Redis queue again. However,
we wanted to have a test-case first that actually mimics the shutdown behavior noticed on
live.

Any tips on how to achieve this reliably?




Mime
View raw message