camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fongys <>
Subject Camel-kafka error on too long on synchronous invocation
Date Fri, 20 May 2016 10:03:13 GMT
When using camel-kafka, if the processor / synchronous to (like direct, etc)
take too long (>30s) to process, it will cause the kafka re-balanced.

The issue is caused by the kafka client send heart beat to broker in the
poll function. After receive the ConsumerRecord from poll, there is no
separate thread to send heart beat to broker. If the processor process the
exchange too long and cause the next poll happen > 30s later, the kafka
client will be kicked off and rebalance occur. This may make the route never

If multiple ConsumerRecords return from a single poll and the total time to
process the records > 30s, it will happen too.

Suggested to use a separate thread to process the exchange and pause the
Kafka client if needed.

Future future = processorExecutor.submit(new Runnable() {
							public void run() {
								try {
								} catch (Exception e) {
									getExceptionHandler().handleException("Error during processing",
exchange, e);

								// Wait at most 25s
								LOG.debug("Waiting process to finish");
								processorFuture.get(25000 - (System.currentTimeMillis() -
lastPollTime), TimeUnit.MILLISECONDS);
								processFinish = true;								
							}catch(TimeoutException e){
								if (consumerFetcherStatus){"Pause consumer");
									consumer.pause((TopicPartition[]) consumer.assignment().toArray(new
									consumerFetcherStatus = false;
							"Call poll(1) to make sure kafka client keep alive");
								records = consumer.poll(1);
								lastPollTime = System.currentTimeMillis();
								if (records.count() > 0){"There should be nothing, but may be some partition arrive
on rebalance, try pause again");
									consumer.pause((TopicPartition[]) consumer.assignment().toArray(new
							} catch (InterruptedException|ExecutionException e) {"Encounter exception on waiting processor thread", e);

Also please note the rebalance on Kafka. When rebalance happen, the assigned
topic partition will all get revoked and any "not yet processe" message
should be discard as those messages' topic partition may re-assign to other

View this message in context:
Sent from the Camel - Users mailing list archive at

View raw message