Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B4BFA63B9 for ; Tue, 28 Jun 2011 12:17:20 +0000 (UTC) Received: (qmail 13034 invoked by uid 500); 28 Jun 2011 12:17:20 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 12974 invoked by uid 500); 28 Jun 2011 12:17:20 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 12967 invoked by uid 99); 28 Jun 2011 12:17:19 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jun 2011 12:17:19 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jun 2011 12:17:18 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 5D5C52388994 for ; Tue, 28 Jun 2011 12:16:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1140555 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/file/ components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/ Date: Tue, 28 Jun 2011 12:16:58 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110628121658.5D5C52388994@eris.apache.org> Author: davsclaus Date: Tue Jun 28 12:16:57 2011 New Revision: 1140555 URL: http://svn.apache.org/viewvc?rev=1140555&view=rev Log: CAMEL-3655: Reverted back of previous work on this as its not needed anymore. Removed: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumerSupport.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=1140555&r1=1140554&r2=1140555&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Tue Jun 28 12:16:57 2011 @@ -42,9 +42,9 @@ import org.slf4j.LoggerFactory; */ public abstract class GenericFileConsumer extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware { protected final transient Logger log = LoggerFactory.getLogger(getClass()); - protected final ProcessFile processFile; protected GenericFileEndpoint endpoint; protected GenericFileOperations operations; + protected boolean loggedIn; protected String fileExpressionResult; protected int maxMessagesPerPoll; protected volatile ShutdownRunningTask shutdownRunningTask; @@ -54,13 +54,6 @@ public abstract class GenericFileConsume super(endpoint, processor); this.endpoint = endpoint; this.operations = operations; - this.processFile = new ProcessFile(this); - } - - @Override - @SuppressWarnings("unchecked") - public GenericFileEndpoint getEndpoint() { - return (GenericFileEndpoint) super.getEndpoint(); } /** @@ -81,7 +74,7 @@ public abstract class GenericFileConsume // gather list of files to process List> files = new ArrayList>(); - String name = getEndpoint().getConfiguration().getDirectory(); + String name = endpoint.getConfiguration().getDirectory(); // time how long time it takes to poll StopWatch stop = new StopWatch(); @@ -97,21 +90,21 @@ public abstract class GenericFileConsume } // sort files using file comparator if provided - if (getEndpoint().getSorter() != null) { - Collections.sort(files, getEndpoint().getSorter()); + if (endpoint.getSorter() != null) { + Collections.sort(files, endpoint.getSorter()); } // sort using build in sorters so we can use expressions LinkedList exchanges = new LinkedList(); for (GenericFile file : files) { - Exchange exchange = getEndpoint().createExchange(file); - getEndpoint().configureExchange(exchange); - getEndpoint().configureMessage(file, exchange.getIn()); + Exchange exchange = endpoint.createExchange(file); + endpoint.configureExchange(exchange); + endpoint.configureMessage(file, exchange.getIn()); exchanges.add(exchange); } // sort files using exchange comparator if provided - if (getEndpoint().getSortBy() != null) { - Collections.sort(exchanges, getEndpoint().getSortBy()); + if (endpoint.getSortBy() != null) { + Collections.sort(exchanges, endpoint.getSortBy()); } // consume files one by one @@ -163,7 +156,7 @@ public abstract class GenericFileConsume Exchange exchange = (Exchange) exchanges.poll(); GenericFile file = (GenericFile) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE); String key = file.getAbsoluteFilePath(); - getEndpoint().getInProgressRepository().remove(key); + endpoint.getInProgressRepository().remove(key); } return total; @@ -258,22 +251,83 @@ public abstract class GenericFileConsume } /** - * Gets the operations to be used - * - * @return the operations - */ - public GenericFileOperations getOperations() { - return operations; - } - - /** * Processes the exchange * * @param exchange the exchange */ protected void processExchange(final Exchange exchange) { - // let the process do the work - processFile.processExchange(exchange); + GenericFile file = getExchangeFileProperty(exchange); + log.trace("Processing file: {}", file); + + // must extract the absolute name before the begin strategy as the file could potentially be pre moved + // and then the file name would be changed + String absoluteFileName = file.getAbsoluteFilePath(); + + // check if we can begin processing the file + try { + final GenericFileProcessStrategy processStrategy = endpoint.getGenericFileProcessStrategy(); + + boolean begin = processStrategy.begin(operations, endpoint, exchange, file); + if (!begin) { + log.debug(endpoint + " cannot begin processing file: {}", file); + // begin returned false, so remove file from the in progress list as its no longer in progress + endpoint.getInProgressRepository().remove(absoluteFileName); + return; + } + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug(endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage(), e); + } + endpoint.getInProgressRepository().remove(absoluteFileName); + return; + } + + // must use file from exchange as it can be updated due the + // preMoveNamePrefix/preMoveNamePostfix options + final GenericFile target = getExchangeFileProperty(exchange); + // must use full name when downloading so we have the correct path + final String name = target.getAbsoluteFilePath(); + try { + // retrieve the file using the stream + log.trace("Retrieving file: {} from: {}", name, endpoint); + + // retrieve the file and check it was a success + boolean retrieved = operations.retrieveFile(name, exchange); + if (!retrieved) { + // throw exception to handle the problem with retrieving the file + // then if the method return false or throws an exception is handled the same in here + // as in both cases an exception is being thrown + throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint); + } + + log.trace("Retrieved file: {} from: {}", name, endpoint); + + // register on completion callback that does the completion strategies + // (for instance to move the file after we have processed it) + exchange.addOnCompletion(new GenericFileOnCompletion(endpoint, operations, target, absoluteFileName)); + + log.debug("About to process file: {} using exchange: {}", target, exchange); + + // process the exchange using the async consumer to support async routing engine + // which can be supported by this file consumer as all the done work is + // provided in the GenericFileOnCompletion + getAsyncProcessor().process(exchange, new AsyncCallback() { + public void done(boolean doneSync) { + // noop + if (log.isTraceEnabled()) { + log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously"); + } + } + }); + + } catch (Exception e) { + // remove file from the in progress list due to failure + // (cannot be in finally block due to GenericFileOnCompletion will remove it + // from in progress when it takes over and processes the file, which may happen + // by another thread at a later time. So its only safe to remove it if there was an exception) + endpoint.getInProgressRepository().remove(absoluteFileName); + handleException(e); + } } /** @@ -287,7 +341,7 @@ public abstract class GenericFileConsume if (!isMatched(file, isDirectory)) { log.trace("File did not match. Will skip this file: {}", file); return false; - } else if (getEndpoint().isIdempotent() && getEndpoint().getIdempotentRepository().contains(file.getAbsoluteFilePath())) { + } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) { log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: {}", file); return false; } @@ -328,26 +382,26 @@ public abstract class GenericFileConsume return true; } - if (getEndpoint().getFilter() != null) { - if (!getEndpoint().getFilter().accept(file)) { + if (endpoint.getFilter() != null) { + if (!endpoint.getFilter().accept(file)) { return false; } } - if (ObjectHelper.isNotEmpty(getEndpoint().getExclude())) { - if (name.matches(getEndpoint().getExclude())) { + if (ObjectHelper.isNotEmpty(endpoint.getExclude())) { + if (name.matches(endpoint.getExclude())) { return false; } } - if (ObjectHelper.isNotEmpty(getEndpoint().getInclude())) { - if (!name.matches(getEndpoint().getInclude())) { + if (ObjectHelper.isNotEmpty(endpoint.getInclude())) { + if (!name.matches(endpoint.getInclude())) { return false; } } // use file expression for a simple dynamic file filter - if (getEndpoint().getFileName() != null) { + if (endpoint.getFileName() != null) { evaluateFileExpression(); if (fileExpressionResult != null) { if (!name.equals(fileExpressionResult)) { @@ -357,19 +411,19 @@ public abstract class GenericFileConsume } // if done file name is enabled, then the file is only valid if a done file exists - if (getEndpoint().getDoneFileName() != null) { + if (endpoint.getDoneFileName() != null) { // done file must be in same path as the file - String doneFileName = getEndpoint().createDoneFileName(file.getAbsoluteFilePath()); - ObjectHelper.notEmpty(doneFileName, "doneFileName", getEndpoint()); + String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath()); + ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint); // is it a done file name? - if (getEndpoint().isDoneFile(file.getFileNameOnly())) { + if (endpoint.isDoneFile(file.getFileNameOnly())) { log.trace("Skipping done file: {}", file); return false; } // the file is only valid if the done file exist - if (!getOperations().existsFile(doneFileName)) { + if (!operations.existsFile(doneFileName)) { log.trace("Done file: {} does not exist", doneFileName); return false; } @@ -386,13 +440,13 @@ public abstract class GenericFileConsume */ protected boolean isInProgress(GenericFile file) { String key = file.getAbsoluteFilePath(); - return !getEndpoint().getInProgressRepository().add(key); + return !endpoint.getInProgressRepository().add(key); } private void evaluateFileExpression() { if (fileExpressionResult == null) { // create a dummy exchange as Exchange is needed for expression evaluation - Exchange dummy = new DefaultExchange(getEndpoint().getCamelContext()); + Exchange dummy = new DefaultExchange(endpoint.getCamelContext()); fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class); } } @@ -407,35 +461,6 @@ public abstract class GenericFileConsume super.doStart(); // prepare on startup - getEndpoint().getGenericFileProcessStrategy().prepareOnStartup(getOperations(), getEndpoint()); + endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint); } - - /** - * Class the processes the exchange when a file has been polled. - */ - private class ProcessFile extends GenericFileConsumerSupport { - - public ProcessFile(GenericFileConsumer consumer) { - super(consumer); - } - - @Override - void handleExceptionStrategy(Exception e) { - // handle the exception on the consumer - handleException(e); - } - - @Override - void processFileStrategy(Exchange exchange) { - // process the exchange using the async consumer to support async routing engine - // which can be supported by this file consumer as all the done work is - // provided in the GenericFileOnCompletion - getAsyncProcessor().process(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - // noop - } - }); - } - } - } Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=1140555&r1=1140554&r2=1140555&view=diff ============================================================================== --- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java (original) +++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java Tue Jun 28 12:16:57 2011 @@ -39,7 +39,7 @@ public abstract class RemoteFileConsumer return (RemoteFileEndpoint) super.getEndpoint(); } - public RemoteFileOperations getOperations() { + protected RemoteFileOperations getOperations() { return (RemoteFileOperations) operations; }