camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Wolf <cwolf.a...@gmail.com>
Subject Cannot get SFTP component to stop polling
Date Fri, 07 Jun 2013 18:44:21 GMT
I am using Camel 2.10.4 and need to use SFTP to fetch files.  Since
the sftp consumer does not support dynamic properties (connecting to
different host/directory/user/pass, etc.) I implemented a custom
processor which sets up the SftpComponent, SftpEndpoint and
SftpConsumer, in what I believe to be very similar to the "sftp://"
component, but instead of parsing the URI for the settings, it gets
the settings from the inbound body of type Map<String, Object>.

It all works, except after a certain time period, I need it to stop
polling.  I tried to suspend the route it's in, but it is still
polling - I tried to
stop the route - it's still polling!  How can I get it to stop??
Though JMX, I can see that the route is definitely stopped, so how can
the SftpConsumer
be still polling???


Thanks,


Chris


Here's roughly what my custom Processor looks like (logging, error
handling stripped out)

public class  SftpDownloader implements Processor {
    protected SftpComponent sftpComponent;
    protected SftpEndpoint  sftpEndpoint;
    protected SftpConsumer  sftpConsumer;
    protected org.apache.camel.ProducerTemplate producer;
    protected String toEndpointURI;

    public void process(Exchange exchange) throws Exception {
        Map<String, Object> ftpProps = exchange.getIn().getBody(Map.class);
        CamelContext context = exchange.getContext();

        if (this.producer == null) {
            this.producer = context.createProducerTemplate();
            this.producer.setDefaultEndpointUri(this.toEndpointURI);
        }

        configure(context, ftpProps, endpointURIQueryString);
    }

    void configure(CamelContext context, Map<String, Object>
parameters, String queryStr) throws Exception {
        String initialURI = String.format("sftp://%s/%s",
parameters.get("host"), parameters.get("directory"));
        sftpEndpoint = context.getEndpoint(initialURI, SftpEndpoint.class);
        sftpComponent = (SftpComponent) sftpEndpoint.getComponent();

        SftpConfiguration conf = sftpEndpoint.getConfiguration();

        // set reference properties first as they use # syntax that
fools the regular properties setter
        EndpointHelper.setReferenceProperties(context, conf, parameters);
        EndpointHelper.setProperties(context, conf, parameters);
        EndpointHelper.setReferenceProperties(context, sftpEndpoint,
parameters);
        EndpointHelper.setProperties(context, sftpEndpoint, parameters);

        sftpConsumer = (SftpConsumer)
            sftpEndpoint.createConsumer(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                producer.send(exchange);
            }
        });

        ((ScheduledPollConsumer) sftpConsumer).setStartScheduler(true);
        sftpConsumer.start();
    }

Mime
View raw message