camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Wolf <>
Subject When is an (S)FTP file no longer "in progress"?
Date Thu, 04 Apr 2013 15:11:55 GMT

I created a custom Processor with producer template to perform SFTP
rather then using the sftp component normally,
due to the lack of dynamic URI capability for consumer.

onent.file.remote.SftpConsumer TRACE Skipping as file is already in
progress: CBOE34_MKT_20120319_DAILY.csv

The type of consumer created is ScheduledBatchPollingConsumer - how do
I indicate a batch is done?  Do I need to
suspend or stop this consumer?  Or does the down-stream endpoint, i.e.
"to(...)", have to indicate a response
(acknowledgement) on the exchange of file received?

Thanks for any assistance with this matter...


(Note, gmail text-plain will mangle the formatting of this code)

    public void process(Exchange exchange) throws Exception {
        Map<String, Object> ftpProp = exchange.getIn().getBody(Map.class);
        if (ftpProp == null)
            throw new RuntimeCamelException("No object of type
Map<String, Object> in input exchange message.");
  "FTP Properites: {}", ftpProp);

        Integer port = exchange.getIn().getHeader("SFTP_PORT",
        ftpProp.put("port", port);

        CamelContext context = exchange.getContext();
        //String routeId = exchange.getFromRouteId();
        configure(context, ftpProp, endpointURIQueryString);

    void configure(CamelContext context, Map<String, Object>
parameters, String queryStr) throws Exception {
        parameters.put("separator", RemoteFileConfiguration.PathSeparator.UNIX);
        parameters.put("binary", Boolean.TRUE);
        //parameters.put("disconnect", Boolean.TRUE);
        //parameters.put("passive", Boolean.TRUE);
        String initialURI = String.format("sftp://%s/%s?%s",
parameters.get("host"), parameters.get("directory"), queryStr);
        sftpEndpoint = context.getEndpoint(initialURI, SftpEndpoint.class);
        sftpComponent = (SftpComponent) sftpEndpoint.getComponent();

        SftpConfiguration conf = sftpEndpoint.getConfiguration();

        // set reference properties first as they use # syntax that
fools the regular properties setter
        EndpointHelper.setReferenceProperties(context, conf, parameters);
        EndpointHelper.setProperties(context, conf, parameters);
        EndpointHelper.setReferenceProperties(context, sftpEndpoint,
        EndpointHelper.setProperties(context, sftpEndpoint, parameters);

        ServiceStatus status = sftpComponent.getStatus();"***************** Component: {}", status);

        status = sftpEndpoint.getStatus();"***************** Endpoint: {}", status);
        //EventDrivenPollingConsumer consumer = (EventDrivenPollingConsumer)
        //    sftpEndpoint.createPollingConsumer();

        sftpConsumer = (SftpConsumer)
            sftpEndpoint.createConsumer(new Processor() {
            public void process(Exchange exchange) throws Exception {
                producer.send(exchange); // send file down-stream...
        Map<String, Object> cprop = sftpEndpoint.getConsumerProperties();
        ((ScheduledPollConsumer) sftpConsumer).setStartScheduler(true);
        status = sftpConsumer.getStatus();"***************** Consumer: {}", status);

    public void setProducer(ProducerTemplate producer) {
        this.producer = producer;

    public String getEndpointURIQueryString() {
        return endpointURIQueryString;

    public void setEndpointURIQueryString(String endpointURIQueryString) {
        this.endpointURIQueryString = endpointURIQueryString;

View raw message