camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fongys <fongyatsan+ca...@gmail.com>
Subject Camel-kafka error on too long on synchronous invocation
Date Fri, 20 May 2016 10:03:13 GMT
When using camel-kafka, if the processor / synchronous to (like direct, etc)
take too long (>30s) to process, it will cause the kafka re-balanced.

The issue is caused by the kafka client send heart beat to broker in the
poll function. After receive the ConsumerRecord from poll, there is no
separate thread to send heart beat to broker. If the processor process the
exchange too long and cause the next poll happen > 30s later, the kafka
client will be kicked off and rebalance occur. This may make the route never
success.

If multiple ConsumerRecords return from a single poll and the total time to
process the records > 30s, it will happen too.

Suggested to use a separate thread to process the exchange and pause the
Kafka client if needed.

Example:
Future future = processorExecutor.submit(new Runnable() {
							@Override
							public void run() {
								try {
									processor.process(exchange);
								} catch (Exception e) {
									getExceptionHandler().handleException("Error during processing",
exchange, e);
								}
							}
						});

while(!processFinish){
							try{
								// Wait at most 25s
								LOG.debug("Waiting process to finish");
								processorFuture.get(25000 - (System.currentTimeMillis() -
lastPollTime), TimeUnit.MILLISECONDS);
								processFinish = true;								
							}catch(TimeoutException e){
								if (consumerFetcherStatus){
									LOG.info("Pause consumer");
									consumer.pause((TopicPartition[]) consumer.assignment().toArray(new
TopicPartition[consumer.assignment().size()]));
									consumerFetcherStatus = false;
								}
								
								LOG.info("Call poll(1) to make sure kafka client keep alive");
								records = consumer.poll(1);
								lastPollTime = System.currentTimeMillis();
								if (records.count() > 0){
									LOG.info("There should be nothing, but may be some partition arrive
on rebalance, try pause again");
									consumer.pause((TopicPartition[]) consumer.assignment().toArray(new
TopicPartition[consumer.assignment().size()]));
									putToList(records);
								}
							} catch (InterruptedException|ExecutionException e) {
								LOG.info("Encounter exception on waiting processor thread", e);
							}
						}


Also please note the rebalance on Kafka. When rebalance happen, the assigned
topic partition will all get revoked and any "not yet processe" message
should be discard as those messages' topic partition may re-assign to other
client.



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-kafka-error-on-too-long-on-synchronous-invocation-tp5782821.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Mime
View raw message