Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 75924 invoked from network); 21 Jan 2010 23:02:50 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 21 Jan 2010 23:02:50 -0000 Received: (qmail 42711 invoked by uid 500); 21 Jan 2010 23:02:50 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 42687 invoked by uid 500); 21 Jan 2010 23:02:50 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 42678 invoked by uid 99); 21 Jan 2010 23:02:50 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Jan 2010 23:02:50 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Jan 2010 23:02:49 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 37F4923888BD; Thu, 21 Jan 2010 23:02:29 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r901911 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java Date: Thu, 21 Jan 2010 23:02:29 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100121230229.37F4923888BD@eris.apache.org> Author: tabish Date: Thu Jan 21 23:02:28 2010 New Revision: 901911 URL: http://svn.apache.org/viewvc?rev=901911&view=rev Log: fix for https://issues.apache.org/activemq/browse/AMQ-2548 don't close the ftp connection until the stream is closed. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java?rev=901911&r1=901910&r2=901911&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/blob/FTPBlobDownloadStrategy.java Thu Jan 21 23:02:28 2010 @@ -18,6 +18,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.FilterInputStream; import java.net.ConnectException; import java.net.URL; @@ -35,36 +36,42 @@ public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException { URL url = message.getURL(); - + setUserInformation(url.getUserInfo()); String connectUrl = url.getHost(); int port = url.getPort() < 1 ? 21 : url.getPort(); - FTPClient ftp = new FTPClient(); + final FTPClient ftp = new FTPClient(); try { - ftp.connect(connectUrl, port); + ftp.connect(connectUrl, port); } catch(ConnectException e) { - throw new JMSException("Problem connecting the FTP-server"); + throw new JMSException("Problem connecting the FTP-server"); } - + if(!ftp.login(ftpUser, ftpPass)) { - ftp.quit(); + ftp.quit(); ftp.disconnect(); throw new JMSException("Cant Authentificate to FTP-Server"); } String path = url.getPath(); String workingDir = path.substring(0, path.lastIndexOf("/")); String file = path.substring(path.lastIndexOf("/")+1); - + ftp.changeWorkingDirectory(workingDir); ftp.setFileType(FTPClient.BINARY_FILE_TYPE); - InputStream input = ftp.retrieveFileStream(file); - ftp.quit(); - ftp.disconnect(); - + + InputStream input = new FilterInputStream(ftp.retrieveFileStream(file)) { + + public void close() throws IOException { + in.close(); + ftp.quit(); + ftp.disconnect(); + } + }; + return input; } - + private void setUserInformation(String userInfo) { if(userInfo != null) { String[] userPass = userInfo.split(":"); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java?rev=901911&r1=901910&r2=901911&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java Thu Jan 21 23:02:28 2010 @@ -27,7 +27,6 @@ import junit.framework.TestCase; import org.apache.activemq.command.ActiveMQBlobMessage; -import org.apache.commons.net.ftp.FTPClient; import org.apache.ftpserver.FtpServer; import org.apache.ftpserver.FtpServerFactory; import org.apache.ftpserver.ftplet.UserManager; @@ -37,106 +36,116 @@ import org.jmock.Mockery; public class FTPBlobDownloadStrategyTest extends TestCase { - + private static final String ftpServerListenerName = "default"; private FtpServer server; final static String userNamePass = "activemq"; - Mockery context = null; - int ftpPort; - String ftpUrl; - - protected void setUp() throws Exception { - final File ftpHomeDirFile = new File("target/FTPBlobTest/ftptest"); - ftpHomeDirFile.mkdirs(); - ftpHomeDirFile.getParentFile().deleteOnExit(); + Mockery context = null; + int ftpPort; + String ftpUrl; + + final int FILE_SIZE = Short.MAX_VALUE * 10; - FtpServerFactory serverFactory = new FtpServerFactory(); - ListenerFactory factory = new ListenerFactory(); + protected void setUp() throws Exception { + final File ftpHomeDirFile = new File("target/FTPBlobTest/ftptest"); + ftpHomeDirFile.mkdirs(); + ftpHomeDirFile.getParentFile().deleteOnExit(); - PropertiesUserManagerFactory userManagerFactory = new PropertiesUserManagerFactory(); - UserManager userManager = userManagerFactory.createUserManager(); + FtpServerFactory serverFactory = new FtpServerFactory(); + ListenerFactory factory = new ListenerFactory(); - BaseUser user = new BaseUser(); + PropertiesUserManagerFactory userManagerFactory = new PropertiesUserManagerFactory(); + UserManager userManager = userManagerFactory.createUserManager(); + + BaseUser user = new BaseUser(); user.setName("activemq"); user.setPassword("activemq"); user.setHomeDirectory(ftpHomeDirFile.getParent()); - + userManager.save(user); - serverFactory.setUserManager(userManager); - factory.setPort(0); - serverFactory.addListener(ftpServerListenerName, factory - .createListener()); - server = serverFactory.createServer(); - server.start(); - ftpPort = serverFactory.getListener(ftpServerListenerName) - .getPort(); - - ftpUrl = "ftp://" + userNamePass + ":" + userNamePass + "@localhost:" - + ftpPort + "/ftptest/"; - - File uploadFile = new File(ftpHomeDirFile, "test.txt"); - FileWriter wrt = new FileWriter(uploadFile); - wrt.write("hello world"); - wrt.close(); + serverFactory.setUserManager(userManager); + factory.setPort(0); + serverFactory.addListener(ftpServerListenerName, factory + .createListener()); + server = serverFactory.createServer(); + server.start(); + ftpPort = serverFactory.getListener(ftpServerListenerName) + .getPort(); + + ftpUrl = "ftp://" + userNamePass + ":" + userNamePass + "@localhost:" + + ftpPort + "/ftptest/"; + + File uploadFile = new File(ftpHomeDirFile, "test.txt"); + FileWriter wrt = new FileWriter(uploadFile); + + wrt.write("hello world"); + + for(int ix = 0; ix < FILE_SIZE; ++ix ) { + wrt.write("a"); + } + + wrt.close(); + + } + + public void testDownload() { + ActiveMQBlobMessage message = new ActiveMQBlobMessage(); + BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); + InputStream stream; + try { + message.setURL(new URL(ftpUrl + "test.txt")); + stream = strategy.getInputStream(message); + int i = stream.read(); + StringBuilder sb = new StringBuilder(2048); + while(i != -1) { + sb.append((char)i); + i = stream.read(); + } + Assert.assertEquals("hello world", sb.toString().substring(0, "hello world".length())); + Assert.assertEquals(FILE_SIZE, sb.toString().substring("hello world".length()).length()); + + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(false); + } + } + + public void testWrongAuthentification() { + ActiveMQBlobMessage message = new ActiveMQBlobMessage(); + BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); + try { + message.setURL(new URL("ftp://" + userNamePass + "_wrong:" + userNamePass + "@localhost:" + ftpPort + "/ftptest/")); + strategy.getInputStream(message); + } catch(JMSException e) { + Assert.assertEquals("Wrong Exception", "Cant Authentificate to FTP-Server", e.getMessage()); + return; + } catch(Exception e) { + System.out.println(e); + Assert.assertTrue("Wrong Exception "+ e, false); + return; + } + + Assert.assertTrue("Expect Exception", false); + } + + public void testWrongFTPPort() { + ActiveMQBlobMessage message = new ActiveMQBlobMessage(); + BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); + try { + message.setURL(new URL("ftp://" + userNamePass + ":" + userNamePass + "@localhost:" + 422 + "/ftptest/")); + strategy.getInputStream(message); + } catch(JMSException e) { + Assert.assertEquals("Wrong Exception", "Problem connecting the FTP-server", e.getMessage()); + return; + } catch(Exception e) { + e.printStackTrace(); + Assert.assertTrue("Wrong Exception "+ e, false); + return; + } + Assert.assertTrue("Expect Exception", false); } - - public void testDownload() { - ActiveMQBlobMessage message = new ActiveMQBlobMessage(); - BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); - InputStream stream; - try { - message.setURL(new URL(ftpUrl + "test.txt")); - stream = strategy.getInputStream(message); - int i = stream.read(); - StringBuilder sb = new StringBuilder(10); - while(i != -1) { - sb.append((char)i); - i = stream.read(); - } - Assert.assertEquals("hello world", sb.toString()); - } catch (Exception e) { - e.printStackTrace(); - Assert.assertTrue(false); - } - } - - public void testWrongAuthentification() { - ActiveMQBlobMessage message = new ActiveMQBlobMessage(); - BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); - try { - message.setURL(new URL("ftp://" + userNamePass + "_wrong:" + userNamePass + "@localhost:" + ftpPort + "/ftptest/")); - strategy.getInputStream(message); - } catch(JMSException e) { - Assert.assertEquals("Wrong Exception", "Cant Authentificate to FTP-Server", e.getMessage()); - return; - } catch(Exception e) { - System.out.println(e); - Assert.assertTrue("Wrong Exception "+ e, false); - return; - } - - Assert.assertTrue("Expect Exception", false); - } - - public void testWrongFTPPort() { - ActiveMQBlobMessage message = new ActiveMQBlobMessage(); - BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(); - try { - message.setURL(new URL("ftp://" + userNamePass + ":" + userNamePass + "@localhost:" + 422 + "/ftptest/")); - strategy.getInputStream(message); - } catch(JMSException e) { - Assert.assertEquals("Wrong Exception", "Problem connecting the FTP-server", e.getMessage()); - return; - } catch(Exception e) { - e.printStackTrace(); - Assert.assertTrue("Wrong Exception "+ e, false); - return; - } - - Assert.assertTrue("Expect Exception", false); - } }