camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From selva <sgpsel...@gmail.com>
Subject Re: Quartz clustering in camel spring DSL - JIRA CAMEL-8076
Date Tue, 02 Dec 2014 10:54:45 GMT
Hi Willem,

I have downloaded(2.14.x from github) and debugged the camel quartz
component source code and found the reason for the issue i am facing.

As i already posted i am facing issue in the cluster environment, deployed
the camel application as stand alone program running 2 instances. While
executing a route if one instance goes down the other instance should
trigger the same route immediately as we have configured
*recoverableJob=true* in the quartz2 endpoint .

The issue i was facing like the second instance is triggering quartz2
endpoint but not executing my process(QueryBuilderProcessor) .

Example :

<route id="quartz" trace="true">
                        <from
uri="quartz2://cluster/quartz?cron=0+0/4+++*+?&durableJob=true&stateful=true&recoverableJob=true">
                      <to uri="bean:QueryBuilderProcessor" />
</route>


While debugging i found that in camelJob.java, the triggerkey in the normal
and the triggerykey during recovery is different,so the below condition is
failing and going else block(Please check the highlighted else block).


   public void execute(JobExecutionContext context) throws
JobExecutionException {
        Exchange exchange = null;
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Running CamelJob jobExecutionContext={}",
context);
            }

            CamelContext camelContext = getCamelContext(context);
            QuartzEndpoint endpoint = lookupQuartzEndpoint(camelContext,
context);//*  Step 1 : * 
            exchange = endpoint.createExchange();
            exchange.setIn(new QuartzMessage(exchange, context));
            endpoint.getConsumerLoadBalancer().process(exchange);//*Step :2* 



*  Step 1 : * Quartz endpoint creation


          if (triggerKey.equals(checkTriggerKey){
                     return quartzEndpoint;
           }

 
        if (camelContext.hasEndpoint(endpointUri) != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Getting Endpoint from camelContext.");
            }
            result = camelContext.getEndpoint(endpointUri,
QuartzEndpoint.class);
        } else {
            LOG.warn("Cannot find existing QuartzEndpoint with uri: {}.*
Creating new endpoint instance.", endpointUri);*
            result = camelContext.getEndpoint(endpointUri,
QuartzEndpoint.class);
        }



Since its creating new quartz endpoint , the consumerLoadBalancer Object is
null.

So as next step its executing the below code and creating *New *
consumerLoadBalancer Object

*Step :2* 


 public LoadBalancer getConsumerLoadBalancer() {
        if (consumerLoadBalancer == null) {
            consumerLoadBalancer = new RoundRobinLoadBalancer();
        }
        return consumerLoadBalancer;
    }

 Since its creating as a new Object ,the  processors property is empty so
not calling my QueryBuilderProcessor Processor.

In normal flow(not recovery) i can see list of processors details in the
processors property of consumerLoadBalancer .

*Temporary Fix for Testing:* Below Code working fine in Recovery flow with
immediate retry.

As a temporary fix we have modified code in the camelJob.java ,
lookupQuartzEndpoint method like below
(Please check the highlighted text)

    protected QuartzEndpoint lookupQuartzEndpoint(CamelContext camelContext,
JobExecutionContext quartzContext) throws JobExecutionException {
        TriggerKey triggerKey = quartzContext.getTrigger().getKey();
       * JobDetail jobDetail = quartzContext.getJobDetail();
        JobKey jobKey =  jobDetail.getKey();
        *
        if (LOG.isDebugEnabled()) {
            LOG.debug("Looking up existing QuartzEndpoint with
triggerKey={}", triggerKey);
        }

        // check all active routes for the quartz endpoint this task matches
        // as we prefer to use the existing endpoint from the routes
        for (Route route : camelContext.getRoutes()) {
            if (route.getEndpoint() instanceof QuartzEndpoint) {
                QuartzEndpoint quartzEndpoint = (QuartzEndpoint)
route.getEndpoint();
                TriggerKey checkTriggerKey = quartzEndpoint.getTriggerKey();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Checking route endpoint={} with
checkTriggerKey={}", quartzEndpoint, checkTriggerKey);
                }
               if
(triggerKey.equals(checkTriggerKey)*||(jobDetail.requestsRecovery()== true
&&
jobKey.getGroup().equals(checkTriggerKey.getGroup())&&jobKey.getName().equals(checkTriggerKey.getName())))
{*
                    return quartzEndpoint;
               }
            }
        }
                    return quartzEndpoint;
               }


Except the triggerKey remaning all jobdetails are same in the existing
quartzEndpoint so we put the condition check to return the existing quartz
endpoint instead creating new quartz endpoint in the recoveryflow.

* Summary :* 
The problem we found is while creating new quartz endpoint the
consumerLoadBalancer is null.

Please let us know the right way  to get the consumerLoadBalancer values
while creating new quartz endpoint in case of recovery flow.

Thanks,
selva





--
View this message in context: http://camel.465427.n5.nabble.com/Quartz-clustering-in-camel-spring-DSL-JIRA-CAMEL-8076-tp5759589p5759928.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Mime
View raw message