Author: cutting Date: Thu May 29 13:39:27 2008 New Revision: 661473 URL: http://svn.apache.org/viewvc?rev=661473&view=rev Log: HADOOP-3246. Add FTPFileSystem. Contributed by Ankur Goel. Added: hadoop/core/trunk/lib/commons-net-1.4.1.jar (with props) hadoop/core/trunk/lib/oro-2.0.8.jar (with props) hadoop/core/trunk/lib/slf4j-LICENSE.txt hadoop/core/trunk/lib/slf4j-api-1.4.3.jar (with props) hadoop/core/trunk/lib/slf4j-log4j12-1.4.3.jar (with props) hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/ hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPException.java hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPInputStream.java hadoop/core/trunk/src/test/lib/ hadoop/core/trunk/src/test/lib/ftplet-api-1.0.0-SNAPSHOT.jar (with props) hadoop/core/trunk/src/test/lib/ftpserver-core-1.0.0-SNAPSHOT.jar (with props) hadoop/core/trunk/src/test/lib/ftpserver-server-1.0.0-SNAPSHOT.jar (with props) hadoop/core/trunk/src/test/lib/mina-core-2.0.0-M2-20080407.124109-12.jar (with props) hadoop/core/trunk/src/test/org/apache/hadoop/fs/ftp/ hadoop/core/trunk/src/test/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/build.xml hadoop/core/trunk/conf/hadoop-default.xml hadoop/core/trunk/src/test/hadoop-site.xml hadoop/core/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=661473&r1=661472&r2=661473&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Thu May 29 13:39:27 2008 @@ -108,6 +108,8 @@ HDFS filesystem on systems that support FUSE, e.g., Linux. (Pete Wyckoff via cutting) + HADOOP-3246. Add FTPFileSystem. (Ankur Goel via cutting) + IMPROVEMENTS HADOOP-2928. Remove deprecated FileSystem.getContentLength(). Modified: hadoop/core/trunk/build.xml URL: http://svn.apache.org/viewvc/hadoop/core/trunk/build.xml?rev=661473&r1=661472&r2=661473&view=diff ============================================================================== --- hadoop/core/trunk/build.xml (original) +++ hadoop/core/trunk/build.xml Thu May 29 13:39:27 2008 @@ -75,6 +75,7 @@ value="${build.dir}/c++-examples/${build.platform}"/> + @@ -148,6 +149,10 @@ + + + + Modified: hadoop/core/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=661473&r1=661472&r2=661473&view=diff ============================================================================== --- hadoop/core/trunk/conf/hadoop-default.xml (original) +++ hadoop/core/trunk/conf/hadoop-default.xml Thu May 29 13:39:27 2008 @@ -198,6 +198,12 @@ + fs.ftp.impl + org.apache.hadoop.fs.ftp.FTPFileSystem + The FileSystem for ftp: uris. + + + fs.ramfs.impl org.apache.hadoop.fs.InMemoryFileSystem The FileSystem for ramfs: uris. Added: hadoop/core/trunk/lib/commons-net-1.4.1.jar URL: http://svn.apache.org/viewvc/hadoop/core/trunk/lib/commons-net-1.4.1.jar?rev=661473&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/core/trunk/lib/commons-net-1.4.1.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: hadoop/core/trunk/lib/oro-2.0.8.jar URL: http://svn.apache.org/viewvc/hadoop/core/trunk/lib/oro-2.0.8.jar?rev=661473&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/core/trunk/lib/oro-2.0.8.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: hadoop/core/trunk/lib/slf4j-LICENSE.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/lib/slf4j-LICENSE.txt?rev=661473&view=auto ============================================================================== --- hadoop/core/trunk/lib/slf4j-LICENSE.txt (added) +++ hadoop/core/trunk/lib/slf4j-LICENSE.txt Thu May 29 13:39:27 2008 @@ -0,0 +1,568 @@ + + + + + + + + +[slf4j.org-svn] Log of /slf4j/trunk/LICENSE.txt + + + + + +
+ +
+ + + + + + +
+ + + + +[slf4j.org-svn] +/ + + + +slf4j +/ + + + +trunk +/ + + + +LICENSE.txt + + + + + + + +Repository: + + + +
+ +
+ +
+
ViewVC logotype
+

Log of /slf4j/trunk/LICENSE.txt

+ +

+ +Parent Directory Parent Directory + + + + +

+ +
+ + + + + + + + + + + + + + + +
Links to HEAD: +(view) +(download) + +(annotate) +
Sticky Revision:
+ + + + + +
+ +
+ + + + + + + + + +
+
+ + + + +Revision 979 - + +(view) + +(download) + +(annotate) + + + +- [select for diffs] + + + + +
+ +Modified + +Thu Feb 21 18:43:04 2008 UTC (3 months ago) by ceki + + + + + + + + + +
File length: 1159 byte(s) + + + + + + +
Diff to previous 701 + + + + + + + +
- renaming slf4j-converter module to slf4j-migrator
+
+ + + +
+
+ + + + +Revision 701 - + +(view) + +(download) + +(annotate) + + + +- [select for diffs] + + + + +
+ +Modified + +Fri Feb 2 13:51:23 2007 UTC (15 months, 3 weeks ago) by ceki + + + + + + + + + +
File length: 1159 byte(s) + + + + + + +
Diff to previous 687 + + + + + + + +
+While we claim [1] that the SLF4J license is identical to the terms of
+the MIT License [2], Simon Kaegi observed that the "sublicense" right
+was removed from the actual license as published on our site. This was
+an unintentional omission that I just fixed in this SVN commit.
+
+I also updated the copyright dates and removed SLF4J.org as a copyright
+holder since SLF4J does not exist as a legal entity (only QOS.ch
+does).
+
+I also corrected the discrepancy between the license as published in
+[1] and the license included in the SLF4J distributions [3] which
+included the UC Berkeley advertising clause, a leftover form the early
+days where SLF4H was licensed under BSD.
+
+To cut a long story short, all LICENSE.txt files shipping are now
+identical copies containing exactly the same terms as the MIT license.
+
+Given that licensing changes are politically sensitive, please take
+the time to have a look at the changes and let me know if you have any
+objections to the changes just made.
+
+
+[1] http://slf4j.org/license.html
+[2] http://en.wikipedia.org/wiki/MIT_License
+[3] http://svn.slf4j.org/viewvc/slf4j/trunk/LICENSE.txt?revision=687&view=markup
+
+
+
+ + + +
+
+ + + + +Revision 687 - + +(view) + +(download) + +(annotate) + + + +- [select for diffs] + + + + +
+ +Modified + +Thu Jan 18 14:22:17 2007 UTC (16 months, 1 week ago) by seb + + + + + + + + + +
File length: 1580 byte(s) + + + + + + +
Diff to previous 686 + + + + + + + +
test only, please ignore
+
+ + + +
+
+ + + + +Revision 686 - + +(view) + +(download) + +(annotate) + + + +- [select for diffs] + + + + +
+ +Modified + +Thu Jan 18 13:38:32 2007 UTC (16 months, 1 week ago) by seb + + + + + + + + + +
File length: 1579 byte(s) + + + + + + +
Diff to previous 685 + + + + + + + +
test only, please ignore
+
+ + + +
+
+ + + + +Revision 685 - + +(view) + +(download) + +(annotate) + + + +- [select for diffs] + + + + +
+ +Modified + +Thu Jan 18 13:29:33 2007 UTC (16 months, 1 week ago) by seb + + + + + + + + + +
File length: 1581 byte(s) + + + + + + +
Diff to previous 200 + + + + + + + +
test only, please ignore
+
+ + + +
+
+ + + + +Revision 200 - + +(view) + +(download) + +(annotate) + + + +- [select for diffs] + + + + +
+ +Added + +Fri Aug 26 19:46:12 2005 UTC (2 years, 9 months ago) by ceki + + + + + + + +
File length: 1580 byte(s) + + + + + + + + + + +
missing LICENSE file
+
+ + + + + +
+

+This form allows you to request diffs between any two revisions of this file. +For each of the two "sides" of the diff, + +enter a numeric revision. + +

+
+ + + + + + + + + +
  + +Diffs between + + + +and + + + +
  +Type of Diff should be a + + +
+
+ + +
+
+
+ + +Sort log by: + + +
+
+ + +
+ + + + + + + + + +
slf4j.org lists
ViewVC Help
Powered by ViewVC 1.0.1 
+ + + + Added: hadoop/core/trunk/lib/slf4j-api-1.4.3.jar URL: http://svn.apache.org/viewvc/hadoop/core/trunk/lib/slf4j-api-1.4.3.jar?rev=661473&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/core/trunk/lib/slf4j-api-1.4.3.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: hadoop/core/trunk/lib/slf4j-log4j12-1.4.3.jar URL: http://svn.apache.org/viewvc/hadoop/core/trunk/lib/slf4j-log4j12-1.4.3.jar?rev=661473&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/core/trunk/lib/slf4j-log4j12-1.4.3.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPException.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPException.java?rev=661473&view=auto ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPException.java (added) +++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPException.java Thu May 29 13:39:27 2008 @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.ftp; + +/** + * A class to wrap a {@link Throwable} into a Runtime Exception. + */ +public class FTPException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public FTPException(String message) { + super(message); + } + + public FTPException(Throwable t) { + super(t); + } + + public FTPException(String message, Throwable t) { + super(message, t); + } +} Added: hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java?rev=661473&view=auto ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java (added) +++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java Thu May 29 13:39:27 2008 @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.ftp; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.net.ftp.FTP; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPFile; +import org.apache.commons.net.ftp.FTPReply; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +/** + *

+ * A {@link FileSystem} backed by an FTP client provided by Apache Commons Net. + *

+ */ +public class FTPFileSystem extends FileSystem { + + public static final Log LOG = LogFactory + .getLog("org.apache.hadoop.util.FTPFileSystem"); + + public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; + + public static final int DEFAULT_BLOCK_SIZE = 4 * 1024; + + private URI uri; + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { // get + // get host information from uri (overrides info in conf) + String host = uri.getHost(); + host = (host == null) ? conf.get("fs.ftp.host", null) : host; + if (host == null) { + throw new IOException("Invalid host specified"); + } + conf.set("fs.ftp.host", host); + + // get port information from uri, (overrides info in conf) + int port = uri.getPort(); + port = (port == -1) ? FTP.DEFAULT_PORT : port; + conf.setInt("fs.ftp.host.port", port); + + // get user/password information from URI (overrides info in conf) + String userAndPassword = uri.getUserInfo(); + if (userAndPassword == null) { + userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf + .get("fs.ftp.password." + host, null)); + if (userAndPassword == null) { + throw new IOException("Invalid user/passsword specified"); + } + } + String[] userPasswdInfo = userAndPassword.split(":"); + conf.set("fs.ftp.user." + host, userPasswdInfo[0]); + if (userPasswdInfo.length > 1) { + conf.set("fs.ftp.password." + host, userPasswdInfo[1]); + } else { + conf.set("fs.ftp.password." + host, null); + } + setConf(conf); + this.uri = uri; + } + + /** + * Connect to the FTP server using configuration parameters * + * + * @return An FTPClient instance + * @throws IOException + */ + private FTPClient connect() throws IOException { + FTPClient client = null; + Configuration conf = getConf(); + String host = conf.get("fs.ftp.host"); + int port = conf.getInt("fs.ftp.host.port", FTP.DEFAULT_PORT); + String user = conf.get("fs.ftp.user." + host); + String password = conf.get("fs.ftp.password." + host); + client = new FTPClient(); + client.connect(host, port); + int reply = client.getReplyCode(); + if (!FTPReply.isPositiveCompletion(reply)) { + throw new IOException("Server - " + host + + " refused connection on port - " + port); + } else if (client.login(user, password)) { + client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE); + client.setFileType(FTP.BINARY_FILE_TYPE); + client.setBufferSize(DEFAULT_BUFFER_SIZE); + } else { + throw new IOException("Login failed on server - " + host + ", port - " + + port); + } + + return client; + } + + /** + * Logout and disconnect the given FTPClient. * + * + * @param client + * @throws IOException + */ + private void disconnect(FTPClient client) throws IOException { + if (client != null) { + if (!client.isConnected()) { + throw new FTPException("Client not connected"); + } + boolean logoutSuccess = client.logout(); + client.disconnect(); + if (!logoutSuccess) { + LOG.warn("Logout failed while disconnecting, error code - " + + client.getReplyCode()); + } + } + } + + /** + * Resolve against given working directory. * + * + * @param workDir + * @param path + * @return + */ + private Path makeAbsolute(Path workDir, Path path) { + if (path.isAbsolute()) { + return path; + } + return new Path(workDir, path); + } + + @Override + public FSDataInputStream open(Path file, int bufferSize) throws IOException { + FTPClient client = connect(); + Path workDir = new Path(client.printWorkingDirectory()); + Path absolute = makeAbsolute(workDir, file); + FileStatus fileStat = getFileStatus(client, absolute); + if (fileStat.isDir()) { + disconnect(client); + throw new IOException("Path " + file + " is a directory."); + } + client.allocate(bufferSize); + Path parent = absolute.getParent(); + // Change to parent directory on the + // server. Only then can we read the + // file + // on the server by opening up an InputStream. As a side effect the working + // directory on the server is changed to the parent directory of the file. + // The FTP client connection is closed when close() is called on the + // FSDataInputStream. + client.changeWorkingDirectory(parent.toUri().getPath()); + InputStream is = client.retrieveFileStream(file.getName()); + FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is, + client, statistics)); + if (!FTPReply.isPositivePreliminary(client.getReplyCode())) { + // The ftpClient is an inconsistent state. Must close the stream + // which in turn will logout and disconnect from FTP server + fis.close(); + throw new IOException("Unable to open file: " + file + ", Aborting"); + } + return fis; + } + + /** + * A stream obtained via this call must be closed before using other APIs of + * this class or else the invocation will block. + */ + @Override + public FSDataOutputStream create(Path file, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + final FTPClient client = connect(); + Path workDir = new Path(client.printWorkingDirectory()); + Path absolute = makeAbsolute(workDir, file); + if (exists(client, file)) { + if (overwrite) { + delete(client, file); + } else { + disconnect(client); + throw new IOException("File already exists: " + file); + } + } + Path parent = absolute.getParent(); + if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) { + parent = (parent == null) ? new Path("/") : parent; + disconnect(client); + throw new IOException("create(): Mkdirs failed to create: " + parent); + } + client.allocate(bufferSize); + // Change to parent directory on the server. Only then can we write to the + // file on the server by opening up an OutputStream. As a side effect the + // working directory on the server is changed to the parent directory of the + // file. The FTP client connection is closed when close() is called on the + // FSDataOutputStream. + client.changeWorkingDirectory(parent.toUri().getPath()); + FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file + .getName()), statistics) { + @Override + public void close() throws IOException { + super.close(); + if (!client.isConnected()) { + throw new FTPException("Client not connected"); + } + boolean cmdCompleted = client.completePendingCommand(); + disconnect(client); + if (!cmdCompleted) { + throw new FTPException("Could not complete transfer, Reply Code - " + + client.getReplyCode()); + } + } + }; + if (!FTPReply.isPositivePreliminary(client.getReplyCode())) { + // The ftpClient is an inconsistent state. Must close the stream + // which in turn will logout and disconnect from FTP server + fos.close(); + throw new IOException("Unable to create file: " + file + ", Aborting"); + } + return fos; + } + + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + */ + private boolean exists(FTPClient client, Path file) { + try { + return getFileStatus(client, file) != null; + } catch (FileNotFoundException fnfe) { + return false; + } catch (IOException ioe) { + throw new FTPException("Failed to get file status", ioe); + } + } + + /** @deprecated Use delete(Path, boolean) instead */ + @Override + @Deprecated + public boolean delete(Path file) throws IOException { + return delete(file, false); + } + + @Override + public boolean delete(Path file, boolean recursive) throws IOException { + FTPClient client = connect(); + try { + boolean success = delete(client, file, recursive); + return success; + } finally { + disconnect(client); + } + } + + /** @deprecated Use delete(Path, boolean) instead */ + @Deprecated + private boolean delete(FTPClient client, Path file) throws IOException { + return delete(client, file, false); + } + + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + */ + private boolean delete(FTPClient client, Path file, boolean recursive) + throws IOException { + Path workDir = new Path(client.printWorkingDirectory()); + Path absolute = makeAbsolute(workDir, file); + String pathName = absolute.toUri().getPath(); + FileStatus fileStat = getFileStatus(client, absolute); + if (!fileStat.isDir()) { + return client.deleteFile(pathName); + } + FileStatus[] dirEntries = listStatus(client, absolute); + if (dirEntries != null && dirEntries.length > 0 && !(recursive)) { + throw new IOException("Directory: " + file + " is not empty."); + } + if (dirEntries != null) { + for (int i = 0; i < dirEntries.length; i++) { + delete(client, new Path(absolute, dirEntries[i].getPath()), recursive); + } + } + return client.removeDirectory(pathName); + } + + private FsAction getFsAction(int accessGroup, FTPFile ftpFile) { + FsAction action = FsAction.NONE; + if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) { + action.or(FsAction.READ); + } + if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) { + action.or(FsAction.WRITE); + } + if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) { + action.or(FsAction.EXECUTE); + } + return action; + } + + private FsPermission getPermissions(FTPFile ftpFile) { + FsAction user, group, others; + user = getFsAction(FTPFile.USER_ACCESS, ftpFile); + group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile); + others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile); + return new FsPermission(user, group, others); + } + + @Override + public URI getUri() { + return uri; + } + + @Override + public FileStatus[] listStatus(Path file) throws IOException { + FTPClient client = connect(); + try { + FileStatus[] stats = listStatus(client, file); + return stats; + } finally { + disconnect(client); + } + } + + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + */ + private FileStatus[] listStatus(FTPClient client, Path file) + throws IOException { + Path workDir = new Path(client.printWorkingDirectory()); + Path absolute = makeAbsolute(workDir, file); + FileStatus fileStat = getFileStatus(client, absolute); + if (!fileStat.isDir()) { + return new FileStatus[] { fileStat }; + } + FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath()); + FileStatus[] fileStats = new FileStatus[ftpFiles.length]; + for (int i = 0; i < ftpFiles.length; i++) { + fileStats[i] = getFileStatus(ftpFiles[i], absolute); + } + return fileStats; + } + + @Override + public FileStatus getFileStatus(Path file) throws IOException { + FTPClient client = connect(); + try { + FileStatus status = getFileStatus(client, file); + return status; + } finally { + disconnect(client); + } + } + + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + */ + private FileStatus getFileStatus(FTPClient client, Path file) + throws IOException { + FileStatus fileStat = null; + Path workDir = new Path(client.printWorkingDirectory()); + Path absolute = makeAbsolute(workDir, file); + Path parentPath = absolute.getParent(); + if (parentPath == null) { // root dir + long length = -1; // Length of root dir on server not known + boolean isDir = true; + int blockReplication = 1; + long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known. + long modTime = -1; // Modification time of root dir not known. + Path root = new Path("/"); + return new FileStatus(length, isDir, blockReplication, blockSize, + modTime, root); + } + String pathName = parentPath.toUri().getPath(); + FTPFile[] ftpFiles = client.listFiles(pathName); + if (ftpFiles != null) { + for (FTPFile ftpFile : ftpFiles) { + if (ftpFile.getName().equals(file.getName())) { // file found in dir + fileStat = getFileStatus(ftpFile, parentPath); + break; + } + } + if (fileStat == null) { + throw new FileNotFoundException("File " + file + " does not exist."); + } + } else { + throw new FileNotFoundException("File " + file + " does not exist."); + } + return fileStat; + } + + /** + * Convert the file information in FTPFile to a {@link FileStatus} object. * + * + * @param ftpFile + * @param parentPath + * @return FileStatus + */ + private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) { + long length = ftpFile.getSize(); + boolean isDir = ftpFile.isDirectory(); + int blockReplication = 1; + // Using default block size since there is no way in FTP client to know of + // block sizes on server. The assumption could be less than ideal. + long blockSize = DEFAULT_BLOCK_SIZE; + long modTime = ftpFile.getTimestamp().getTimeInMillis(); + FsPermission permission = getPermissions(ftpFile); + String user = ftpFile.getUser(); + String group = ftpFile.getGroup(); + Path filePath = new Path(parentPath, ftpFile.getName()); + return new FileStatus(length, isDir, blockReplication, blockSize, modTime, + permission, user, group, filePath); + } + + @Override + public boolean mkdirs(Path file, FsPermission permission) throws IOException { + FTPClient client = connect(); + try { + boolean success = mkdirs(client, file, permission); + return success; + } finally { + disconnect(client); + } + } + + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + */ + private boolean mkdirs(FTPClient client, Path file, FsPermission permission) + throws IOException { + boolean created = true; + Path workDir = new Path(client.printWorkingDirectory()); + Path absolute = makeAbsolute(workDir, file); + String pathName = absolute.getName(); + if (!exists(client, absolute)) { + Path parent = absolute.getParent(); + created = (parent == null || mkdirs(client, parent, FsPermission + .getDefault())); + if (created) { + String parentDir = parent.toUri().getPath(); + client.changeWorkingDirectory(parentDir); + created = created & client.makeDirectory(pathName); + } + } else if (isFile(client, absolute)) { + throw new IOException(String.format( + "Can't make directory for path %s since it is a file.", absolute)); + } + return created; + } + + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + */ + private boolean isFile(FTPClient client, Path file) { + try { + return !getFileStatus(client, file).isDir(); + } catch (FileNotFoundException e) { + return false; // file does not exist + } catch (IOException ioe) { + throw new FTPException("File check failed", ioe); + } + } + + /* + * Assuming that parent of both source and destination is the same. Is the + * assumption correct or it is suppose to work like 'move' ? + */ + @Override + public boolean rename(Path src, Path dst) throws IOException { + FTPClient client = connect(); + try { + boolean success = rename(client, src, dst); + return success; + } finally { + disconnect(client); + } + } + + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + * + * @param client + * @param src + * @param dst + * @return + * @throws IOException + */ + private boolean rename(FTPClient client, Path src, Path dst) + throws IOException { + Path workDir = new Path(client.printWorkingDirectory()); + Path absoluteSrc = makeAbsolute(workDir, src); + Path absoluteDst = makeAbsolute(workDir, dst); + if (!exists(client, absoluteSrc)) { + throw new IOException("Source path " + src + " does not exist"); + } + if (exists(client, absoluteDst)) { + throw new IOException("Destination path " + dst + + " already exist, cannot rename!"); + } + String parentSrc = absoluteSrc.getParent().toUri().toString(); + String parentDst = absoluteDst.getParent().toUri().toString(); + String from = src.getName(); + String to = dst.getName(); + if (!parentSrc.equals(parentDst)) { + throw new IOException("Cannot rename parent(source): " + parentSrc + + ", parent(destination): " + parentDst); + } + client.changeWorkingDirectory(parentSrc); + boolean renamed = client.rename(from, to); + return renamed; + } + + @Override + public Path getWorkingDirectory() { + // Return home directory always since we do not maintain state. + return getHomeDirectory(); + } + + @Override + public Path getHomeDirectory() { + FTPClient client = null; + try { + client = connect(); + Path homeDir = new Path(client.printWorkingDirectory()); + return homeDir; + } catch (IOException ioe) { + throw new FTPException("Failed to get home directory", ioe); + } finally { + try { + disconnect(client); + } catch (IOException ioe) { + throw new FTPException("Failed to disconnect", ioe); + } + } + } + + @Override + public void setWorkingDirectory(Path newDir) { + // we do not maintain the working directory state + } +} Added: hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPInputStream.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPInputStream.java?rev=661473&view=auto ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPInputStream.java (added) +++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/ftp/FTPInputStream.java Thu May 29 13:39:27 2008 @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.ftp; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.net.ftp.FTPClient; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem; + +public class FTPInputStream extends FSInputStream { + + InputStream wrappedStream; + FTPClient client; + FileSystem.Statistics stats; + boolean closed; + long pos; + + public FTPInputStream(InputStream stream, FTPClient client, + FileSystem.Statistics stats) { + if (stream == null) { + throw new IllegalArgumentException("Null InputStream"); + } + if (client == null || !client.isConnected()) { + throw new IllegalArgumentException("FTP client null or not connected"); + } + this.wrappedStream = stream; + this.client = client; + this.stats = stats; + this.pos = 0; + this.closed = false; + } + + public long getPos() throws IOException { + return pos; + } + + // We don't support seek. + public void seek(long pos) throws IOException { + throw new IOException("Seek not supported"); + } + + public boolean seekToNewSource(long targetPos) throws IOException { + throw new IOException("Seek not supported"); + } + + public synchronized int read() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + int byteRead = wrappedStream.read(); + if (byteRead >= 0) { + pos++; + } + if (stats != null & byteRead >= 0) { + stats.incrementBytesRead(1); + } + return byteRead; + } + + public synchronized int read(byte buf[], int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + int result = wrappedStream.read(buf, off, len); + if (result > 0) { + pos += result; + } + if (stats != null & result > 0) { + stats.incrementBytesRead(result); + } + + return result; + } + + public synchronized void close() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + super.close(); + closed = true; + if (!client.isConnected()) { + throw new FTPException("Client not connected"); + } + + boolean cmdCompleted = client.completePendingCommand(); + client.logout(); + client.disconnect(); + if (!cmdCompleted) { + throw new FTPException("Could not complete transfer, Reply Code - " + + client.getReplyCode()); + } + } + + // Not supported. + + public boolean markSupported() { + return false; + } + + public void mark(int readLimit) { + // Do nothing + } + + public void reset() throws IOException { + throw new IOException("Mark not supported"); + } +} Modified: hadoop/core/trunk/src/test/hadoop-site.xml URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hadoop-site.xml?rev=661473&r1=661472&r2=661473&view=diff ============================================================================== --- hadoop/core/trunk/src/test/hadoop-site.xml (original) +++ hadoop/core/trunk/src/test/hadoop-site.xml Thu May 29 13:39:27 2008 @@ -27,4 +27,18 @@ Size of a block in bytes.
+ + fs.ftp.user.localhost + user + The username for connecting to FTP server running on localhost. + This is required by FTPFileSystem + + + + fs.ftp.password.localhost + password + The password for connecting to FTP server running on localhost. + This is required by FTPFileSystem + + Added: hadoop/core/trunk/src/test/lib/ftplet-api-1.0.0-SNAPSHOT.jar URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/lib/ftplet-api-1.0.0-SNAPSHOT.jar?rev=661473&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/core/trunk/src/test/lib/ftplet-api-1.0.0-SNAPSHOT.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: hadoop/core/trunk/src/test/lib/ftpserver-core-1.0.0-SNAPSHOT.jar URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/lib/ftpserver-core-1.0.0-SNAPSHOT.jar?rev=661473&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/core/trunk/src/test/lib/ftpserver-core-1.0.0-SNAPSHOT.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: hadoop/core/trunk/src/test/lib/ftpserver-server-1.0.0-SNAPSHOT.jar URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/lib/ftpserver-server-1.0.0-SNAPSHOT.jar?rev=661473&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/core/trunk/src/test/lib/ftpserver-server-1.0.0-SNAPSHOT.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: hadoop/core/trunk/src/test/lib/mina-core-2.0.0-M2-20080407.124109-12.jar URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/lib/mina-core-2.0.0-M2-20080407.124109-12.jar?rev=661473&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/core/trunk/src/test/lib/mina-core-2.0.0-M2-20080407.124109-12.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java?rev=661473&r1=661472&r2=661473&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/DFSTestUtil.java Thu May 29 13:39:27 2008 @@ -94,7 +94,7 @@ long getSeed() { return seed; } } - void createFiles(FileSystem fs, String topdir) throws IOException { + public void createFiles(FileSystem fs, String topdir) throws IOException { createFiles(fs, topdir, (short)3); } @@ -138,7 +138,7 @@ } /** check if the files have been copied correctly. */ - boolean checkFiles(FileSystem fs, String topdir) throws IOException { + public boolean checkFiles(FileSystem fs, String topdir) throws IOException { //Configuration conf = new Configuration(); Path root = new Path(topdir); Added: hadoop/core/trunk/src/test/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java?rev=661473&view=auto ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java (added) +++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java Thu May 29 13:39:27 2008 @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.ftp; + +import java.net.URI; +import junit.framework.TestCase; + +import org.apache.ftpserver.DefaultFtpServerContext; +import org.apache.ftpserver.FtpServer; +import org.apache.ftpserver.ftplet.Authority; +import org.apache.ftpserver.ftplet.UserManager; +import org.apache.ftpserver.listener.mina.MinaListener; +import org.apache.ftpserver.usermanager.BaseUser; +import org.apache.ftpserver.usermanager.WritePermission; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.dfs.DFSTestUtil; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; + +/** + * Generates a bunch of random files and directories using class 'DFSTestUtil', + * stores them on the FTP file system, copies them and check if all the files + * were retrieved successfully without any data corruption + */ +public class TestFTPFileSystem extends TestCase { + + private Configuration defaultConf = new JobConf(); + private FtpServer server = null; + private FileSystem localFs = null; + private FileSystem ftpFs = null; + + private Path workDir = new Path(new Path(System.getProperty( + "test.build.data", "."), "data"), "TestFTPFileSystem"); + + Path ftpServerRoot = new Path(workDir, "FTPServer"); + Path ftpServerConfig = null; + + private void startServer() { + try { + DefaultFtpServerContext context = new DefaultFtpServerContext(false); + MinaListener listener = new MinaListener(); + // Set port to 0 for OS to give a free port + listener.setPort(0); + context.setListener("default", listener); + + // Create a test user. + UserManager userManager = context.getUserManager(); + BaseUser adminUser = new BaseUser(); + adminUser.setName("admin"); + adminUser.setPassword("admin"); + adminUser.setEnabled(true); + adminUser.setAuthorities(new Authority[] { new WritePermission() }); + + Path adminUserHome = new Path(ftpServerRoot, "user/admin"); + adminUser.setHomeDirectory(adminUserHome.toUri().getPath()); + adminUser.setMaxIdleTime(0); + userManager.save(adminUser); + + // Initialize the server and start. + server = new FtpServer(context); + server.start(); + + } catch (Exception e) { + throw new RuntimeException("FTP server start-up failed", e); + } + } + + private void stopServer() { + if (server != null) { + server.stop(); + } + } + + @Override + public void setUp() throws Exception { + startServer(); + defaultConf = new Configuration(); + localFs = FileSystem.getLocal(defaultConf); + ftpServerConfig = new Path(localFs.getWorkingDirectory(), "res"); + MinaListener listener = (MinaListener) server.getServerContext() + .getListener("default"); + int serverPort = listener.getPort(); + ftpFs = FileSystem.get(URI.create("ftp://admin:admin@localhost:" + + serverPort), defaultConf); + } + + @Override + public void tearDown() throws Exception { + localFs.delete(ftpServerRoot, true); + localFs.delete(ftpServerConfig, true); + localFs.close(); + ftpFs.close(); + stopServer(); + } + + /** + * Tests FTPFileSystem, create(), open(), delete(), mkdirs(), rename(), + * listStatus(), getStatus() APIs. * + * + * @throws Exception + */ + public void testReadWrite() throws Exception { + + DFSTestUtil util = new DFSTestUtil("TestFTPFileSystem", 20, 3, 1024 * 1024); + localFs.setWorkingDirectory(workDir); + Path localData = new Path(workDir, "srcData"); + Path remoteData = new Path("srcData"); + + util.createFiles(localFs, localData.toUri().getPath()); + + boolean dataConsistency = util.checkFiles(localFs, localData.getName()); + assertTrue("Test data corrupted", dataConsistency); + + // Copy files and directories recursively to FTP file system. + boolean filesCopied = FileUtil.copy(localFs, localData, ftpFs, remoteData, + false, defaultConf); + assertTrue("Copying to FTPFileSystem failed", filesCopied); + + // Rename the remote copy + Path renamedData = new Path("Renamed"); + boolean renamed = ftpFs.rename(remoteData, renamedData); + assertTrue("Rename failed", renamed); + + // Copy files and directories from FTP file system and delete remote copy. + filesCopied = FileUtil.copy(ftpFs, renamedData, localFs, workDir, true, + defaultConf); + assertTrue("Copying from FTPFileSystem fails", filesCopied); + + // Check if the data was received completely without any corruption. + dataConsistency = util.checkFiles(localFs, renamedData.getName()); + assertTrue("Invalid or corrupted data recieved from FTP Server!", + dataConsistency); + + // Delete local copies + boolean deleteSuccess = localFs.delete(renamedData, true) + & localFs.delete(localData, true); + assertTrue("Local test data deletion failed", deleteSuccess); + } +}