camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [camel] 08/11: CAMEL-12127: camel-ftp - Add option to turn on logging of transfer activity. Lets also see it in the JMX consumer so we can monitor when it last downloaded something.
Date Mon, 08 Jan 2018 10:20:54 GMT
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d4a4fa9fa749799e0dd2da86f1999e32a860000d
Author: Claus Ibsen <claus.ibsen@gmail.com>
AuthorDate: Sun Jan 7 15:32:32 2018 +0100

    CAMEL-12127: camel-ftp - Add option to turn on logging of transfer activity. Lets also
see it in the JMX consumer so we can monitor when it last downloaded something.
---
 .../camel/component/file/remote/FtpOperations.java | 60 ++++++++++++----------
 .../component/file/remote/RemoteFileConsumer.java  | 15 +++++-
 .../component/file/remote/RemoteFileProducer.java  | 15 +++++-
 3 files changed, 62 insertions(+), 28 deletions(-)

diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
index b349894..770e19d 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java
@@ -59,7 +59,7 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
     protected final FTPClientConfig clientConfig;
     protected final CamelLogger transferLogger = new CamelLogger(LoggerFactory.getLogger(FtpClientActivityListener.class));
     protected FtpEndpoint<FTPFile> endpoint;
-    protected volatile FtpClientActivityListener listener;
+    protected FtpClientActivityListener clientActivityListener;
 
     public FtpOperations(FTPClient client, FTPClientConfig clientConfig) {
         this.client = client;
@@ -68,18 +68,26 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
 
     public void setEndpoint(GenericFileEndpoint<FTPFile> endpoint) {
         this.endpoint = (FtpEndpoint<FTPFile>) endpoint;
+        // setup download listener/logger when we have the endpoint configured
+        transferLogger.setLevel(this.endpoint.getTransferLoggingLevel());
+        this.clientActivityListener = new DefaultFtpClientActivityListener(transferLogger,
this.endpoint.isTransferLoggingVerbose(), this.endpoint.getConfiguration().remoteServerInformation());
+    }
+
+    public FtpClientActivityListener getClientActivityListener() {
+        return clientActivityListener;
+    }
+
+    public void setClientActivityListener(FtpClientActivityListener clientActivityListener)
{
+        this.clientActivityListener = clientActivityListener;
     }
 
     public boolean connect(RemoteFileConfiguration configuration) throws GenericFileOperationFailedException
{
-        // setup download listener/logger when we connect
-        transferLogger.setLevel(endpoint.getTransferLoggingLevel());
-        listener = new DefaultFtpClientActivityListener(transferLogger, endpoint.isTransferLoggingVerbose(),
endpoint.getConfiguration().remoteServerInformation());
-        client.setCopyStreamListener(listener);
+        client.setCopyStreamListener(clientActivityListener);
 
         try {
             return doConnect(configuration);
         } catch (GenericFileOperationFailedException e) {
-            listener.onGeneralError(endpoint.getConfiguration().remoteServerInformation(),
e.getMessage());
+            clientActivityListener.onGeneralError(endpoint.getConfiguration().remoteServerInformation(),
e.getMessage());
             throw e;
         }
     }
@@ -110,7 +118,7 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
                 if (log.isTraceEnabled() && attempt > 0) {
                     log.trace("Reconnect attempt #{} connecting to {}", attempt, configuration.remoteServerInformation());
                 }
-                listener.onConnecting(host);
+                clientActivityListener.onConnecting(host);
                 client.connect(host, port);
                 // must check reply code if we are connected
                 int reply = client.getReplyCode();
@@ -153,7 +161,7 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
         }
 
         // we are now connected
-        listener.onConnected(host);
+        clientActivityListener.onConnected(host);
 
         // must enter passive mode directly after connect
         if (configuration.isPassiveMode()) {
@@ -175,7 +183,7 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
         }
 
         try {
-            listener.onLogin(host);
+            clientActivityListener.onLogin(host);
             boolean login;
             if (username != null) {
                 if (account != null) {
@@ -200,12 +208,12 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
                 // store replyString, because disconnect() will reset it
                 String replyString = client.getReplyString();
                 int replyCode = client.getReplyCode();
-                listener.onLoginFailed(replyCode, replyString);
+                clientActivityListener.onLoginFailed(replyCode, replyString);
                 // disconnect to prevent connection leaks
                 client.disconnect();
                 throw new GenericFileOperationFailedException(replyCode, replyString);
             }
-            listener.onLoginComplete(host);
+            clientActivityListener.onLoginComplete(host);
             client.setFileType(configuration.isBinary() ? FTP.BINARY_FILE_TYPE : FTP.ASCII_FILE_TYPE);
         } catch (IOException e) {
             throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString(),
e.getMessage(), e);
@@ -239,14 +247,14 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
         try {
             doDisconnect();
         } catch (GenericFileOperationFailedException e) {
-            listener.onGeneralError(endpoint.getConfiguration().remoteServerInformation(),
e.getMessage());
+            clientActivityListener.onGeneralError(endpoint.getConfiguration().remoteServerInformation(),
e.getMessage());
             throw e;
         }
     }
 
     protected void doDisconnect() throws GenericFileOperationFailedException {
         // logout before disconnecting
-        listener.onDisconnecting(endpoint.getConfiguration().remoteServerInformation());
+        clientActivityListener.onDisconnecting(endpoint.getConfiguration().remoteServerInformation());
         try {
             log.trace("Client logout");
             client.logout();
@@ -260,7 +268,7 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
                 throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString(),
e.getMessage(), e);
             }
         }
-        listener.onDisconnected(endpoint.getConfiguration().remoteServerInformation());
+        clientActivityListener.onDisconnected(endpoint.getConfiguration().remoteServerInformation());
     }
 
     public boolean deleteFile(String name) throws GenericFileOperationFailedException {
@@ -345,8 +353,8 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
 
     public boolean retrieveFile(String name, Exchange exchange) throws GenericFileOperationFailedException
{
         // store the name of the file to download on the listener
-        listener.setRemoteFileName(name);
-        listener.onBeginDownloading(endpoint.getConfiguration().remoteServerInformation(),
name);
+        clientActivityListener.setRemoteFileName(name);
+        clientActivityListener.onBeginDownloading(endpoint.getConfiguration().remoteServerInformation(),
name);
 
         boolean answer;
         try {
@@ -359,12 +367,12 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
                 answer = retrieveFileToStreamInBody(name, exchange);
             }
         } catch (GenericFileOperationFailedException e) {
-            listener.onGeneralError(endpoint.getConfiguration().remoteServerInformation(),
e.getMessage());
+            clientActivityListener.onGeneralError(endpoint.getConfiguration().remoteServerInformation(),
e.getMessage());
             throw e;
         }
 
         if (answer) {
-            listener.onDownloadComplete(endpoint.getConfiguration().remoteServerInformation(),
name);
+            clientActivityListener.onDownloadComplete(endpoint.getConfiguration().remoteServerInformation(),
name);
         }
         return answer;
     }
@@ -553,8 +561,8 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
         name = endpoint.getConfiguration().normalizePath(name);
 
         // store the name of the file to upload on the listener
-        listener.setRemoteFileName(name);
-        listener.onBeginUploading(endpoint.getConfiguration().remoteServerInformation(),
name);
+        clientActivityListener.setRemoteFileName(name);
+        clientActivityListener.onBeginUploading(endpoint.getConfiguration().remoteServerInformation(),
name);
 
         log.trace("storeFile({})", name);
 
@@ -578,7 +586,7 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
             // store the file
             answer = doStoreFile(name, targetName, exchange);
         } catch (GenericFileOperationFailedException e) {
-            listener.onGeneralError(endpoint.getConfiguration().remoteServerInformation(),
e.getMessage());
+            clientActivityListener.onGeneralError(endpoint.getConfiguration().remoteServerInformation(),
e.getMessage());
             throw e;
         } finally {
             // change back to current directory if we changed directory
@@ -588,7 +596,7 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
         }
 
         if (answer) {
-            listener.onUploadComplete(endpoint.getConfiguration().remoteServerInformation(),
name);
+            clientActivityListener.onUploadComplete(endpoint.getConfiguration().remoteServerInformation(),
name);
         }
 
         return answer;
@@ -860,7 +868,7 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
 
     public List<FTPFile> listFiles() throws GenericFileOperationFailedException {
         log.trace("listFiles()");
-        listener.onScanningForFiles(endpoint.remoteServerInformation(), null);
+        clientActivityListener.onScanningForFiles(endpoint.remoteServerInformation(), null);
         try {
             final List<FTPFile> list = new ArrayList<FTPFile>();
             FTPFile[] files = client.listFiles();
@@ -870,14 +878,14 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
             }
             return list;
         } catch (IOException e) {
-            listener.onGeneralError(endpoint.getConfiguration().remoteServerInformation(),
e.getMessage());
+            clientActivityListener.onGeneralError(endpoint.getConfiguration().remoteServerInformation(),
e.getMessage());
             throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString(),
e.getMessage(), e);
         }
     }
 
     public List<FTPFile> listFiles(String path) throws GenericFileOperationFailedException
{
         log.trace("listFiles({})", path);
-        listener.onScanningForFiles(endpoint.remoteServerInformation(), path);
+        clientActivityListener.onScanningForFiles(endpoint.remoteServerInformation(), path);
 
         // use current directory if path not given
         if (ObjectHelper.isEmpty(path)) {
@@ -893,7 +901,7 @@ public class FtpOperations implements RemoteFileOperations<FTPFile>
{
             }
             return list;
         } catch (IOException e) {
-            listener.onGeneralError(endpoint.getConfiguration().remoteServerInformation(),
e.getMessage());
+            clientActivityListener.onGeneralError(endpoint.getConfiguration().remoteServerInformation(),
e.getMessage());
             throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString(),
e.getMessage(), e);
         }
     }
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index 2f58c93..d8653e3 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -179,7 +179,20 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T>
{
         // recover by re-creating operations which should most likely be able to recover
         if (!loggedIn) {
             log.debug("Trying to recover connection to: {} with a fresh client.", getEndpoint());
-            setOperations(getEndpoint().createRemoteFileOperations());
+            // we want to preserve last FTP activity listener when we set a new operations
+            if (operations instanceof FtpOperations) {
+                FtpOperations ftpOperations = (FtpOperations) operations;
+                FtpClientActivityListener listener = ftpOperations.getClientActivityListener();
+                setOperations(getEndpoint().createRemoteFileOperations());
+                getOperations().setEndpoint(getEndpoint());
+                if (listener != null) {
+                    ftpOperations = (FtpOperations) getOperations();
+                    ftpOperations.setClientActivityListener(listener);
+                }
+            } else {
+                setOperations(getEndpoint().createRemoteFileOperations());
+                getOperations().setEndpoint(getEndpoint());
+            }
             connectIfNecessary();
         }
     }
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
index a06e9c4..ebe5b20 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
@@ -202,7 +202,20 @@ public class RemoteFileProducer<T> extends GenericFileProducer<T>
implements Ser
         // recover by re-creating operations which should most likely be able to recover
         if (!loggedIn) {
             log.debug("Trying to recover connection to: {} with a new FTP client.", getEndpoint());
-            setOperations(getEndpoint().createRemoteFileOperations());
+            // we want to preserve last FTP activity listener when we set a new operations
+            if (operations instanceof FtpOperations) {
+                FtpOperations ftpOperations = (FtpOperations) operations;
+                FtpClientActivityListener listener = ftpOperations.getClientActivityListener();
+                setOperations(getEndpoint().createRemoteFileOperations());
+                getOperations().setEndpoint(getEndpoint());
+                if (listener != null) {
+                    ftpOperations = (FtpOperations) getOperations();
+                    ftpOperations.setClientActivityListener(listener);
+                }
+            } else {
+                setOperations(getEndpoint().createRemoteFileOperations());
+                getOperations().setEndpoint(getEndpoint());
+            }
             connectIfNecessary();
         }
     }

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <commits@camel.apache.org>.

Mime
View raw message