Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EF47718612 for ; Wed, 30 Sep 2015 15:41:10 +0000 (UTC) Received: (qmail 86837 invoked by uid 500); 30 Sep 2015 15:41:02 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 86623 invoked by uid 500); 30 Sep 2015 15:41:02 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 85435 invoked by uid 99); 30 Sep 2015 15:41:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Sep 2015 15:41:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8C909E0AD2; Wed, 30 Sep 2015 15:41:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhz@apache.org To: common-commits@hadoop.apache.org Date: Wed, 30 Sep 2015 15:41:15 -0000 Message-Id: <822bb806d5bf4acfa78908cf7943ac23@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [15/58] [abbrv] hadoop git commit: HDFS-8740. Move DistributedFileSystem to hadoop-hdfs-client. Contributed by Mingliang Liu. http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c030c6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java deleted file mode 100644 index 1d20f82..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ /dev/null @@ -1,2262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.BlockStoragePolicySpi; -import org.apache.hadoop.fs.CacheFlag; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FSLinkResolver; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileChecksum; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileSystemLinkResolver; -import org.apache.hadoop.fs.FsServerDefaults; -import org.apache.hadoop.fs.FsStatus; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.XAttrSetFlag; -import org.apache.hadoop.fs.Options.ChecksumOpt; -import org.apache.hadoop.fs.ParentNotDirectoryException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.UnresolvedLinkException; -import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.fs.permission.AclEntry; -import org.apache.hadoop.fs.permission.AclStatus; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.client.HdfsAdmin; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; -import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator; -import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; -import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; -import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; -import org.apache.hadoop.hdfs.protocol.CachePoolEntry; -import org.apache.hadoop.hdfs.protocol.CachePoolInfo; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.protocol.EncryptionZone; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; -import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; -import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; -import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - - -/**************************************************************** - * Implementation of the abstract FileSystem for the DFS system. - * This object is the way end-user code interacts with a Hadoop - * DistributedFileSystem. - * - *****************************************************************/ -@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" }) -@InterfaceStability.Unstable -public class DistributedFileSystem extends FileSystem { - private Path workingDir; - private URI uri; - private String homeDirPrefix = - HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT; - - DFSClient dfs; - private boolean verifyChecksum = true; - - static{ - HdfsConfiguration.init(); - } - - public DistributedFileSystem() { - } - - /** - * Return the protocol scheme for the FileSystem. - *

- * - * @return hdfs - */ - @Override - public String getScheme() { - return HdfsConstants.HDFS_URI_SCHEME; - } - - @Override - public URI getUri() { return uri; } - - @Override - public void initialize(URI uri, Configuration conf) throws IOException { - super.initialize(uri, conf); - setConf(conf); - - String host = uri.getHost(); - if (host == null) { - throw new IOException("Incomplete HDFS URI, no host: "+ uri); - } - homeDirPrefix = conf.get( - HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY, - HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT); - - this.dfs = new DFSClient(uri, conf, statistics); - this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority()); - this.workingDir = getHomeDirectory(); - } - - @Override - public Path getWorkingDirectory() { - return workingDir; - } - - @Override - public long getDefaultBlockSize() { - return dfs.getConf().getDefaultBlockSize(); - } - - @Override - public short getDefaultReplication() { - return dfs.getConf().getDefaultReplication(); - } - - @Override - public void setWorkingDirectory(Path dir) { - String result = fixRelativePart(dir).toUri().getPath(); - if (!DFSUtil.isValidName(result)) { - throw new IllegalArgumentException("Invalid DFS directory name " + - result); - } - workingDir = fixRelativePart(dir); - } - - @Override - public Path getHomeDirectory() { - return makeQualified(new Path(homeDirPrefix + "/" - + dfs.ugi.getShortUserName())); - } - - /** - * Checks that the passed URI belongs to this filesystem and returns - * just the path component. Expects a URI with an absolute path. - * - * @param file URI with absolute path - * @return path component of {file} - * @throws IllegalArgumentException if URI does not belong to this DFS - */ - private String getPathName(Path file) { - checkPath(file); - String result = file.toUri().getPath(); - if (!DFSUtil.isValidName(result)) { - throw new IllegalArgumentException("Pathname " + result + " from " + - file+" is not a valid DFS filename."); - } - return result; - } - - @Override - public BlockLocation[] getFileBlockLocations(FileStatus file, long start, - long len) throws IOException { - if (file == null) { - return null; - } - return getFileBlockLocations(file.getPath(), start, len); - } - - @Override - public BlockLocation[] getFileBlockLocations(Path p, - final long start, final long len) throws IOException { - statistics.incrementReadOps(1); - final Path absF = fixRelativePart(p); - return new FileSystemLinkResolver() { - @Override - public BlockLocation[] doCall(final Path p) - throws IOException, UnresolvedLinkException { - return dfs.getBlockLocations(getPathName(p), start, len); - } - @Override - public BlockLocation[] next(final FileSystem fs, final Path p) - throws IOException { - return fs.getFileBlockLocations(p, start, len); - } - }.resolve(this, absF); - } - - @Override - public void setVerifyChecksum(boolean verifyChecksum) { - this.verifyChecksum = verifyChecksum; - } - - /** - * Start the lease recovery of a file - * - * @param f a file - * @return true if the file is already closed - * @throws IOException if an error occurs - */ - public boolean recoverLease(final Path f) throws IOException { - Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public Boolean doCall(final Path p) - throws IOException, UnresolvedLinkException { - return dfs.recoverLease(getPathName(p)); - } - @Override - public Boolean next(final FileSystem fs, final Path p) - throws IOException { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem myDfs = (DistributedFileSystem)fs; - return myDfs.recoverLease(p); - } - throw new UnsupportedOperationException("Cannot recoverLease through" + - " a symlink to a non-DistributedFileSystem: " + f + " -> " + p); - } - }.resolve(this, absF); - } - - @Override - public FSDataInputStream open(Path f, final int bufferSize) - throws IOException { - statistics.incrementReadOps(1); - Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public FSDataInputStream doCall(final Path p) - throws IOException, UnresolvedLinkException { - final DFSInputStream dfsis = - dfs.open(getPathName(p), bufferSize, verifyChecksum); - return dfs.createWrappedInputStream(dfsis); - } - @Override - public FSDataInputStream next(final FileSystem fs, final Path p) - throws IOException { - return fs.open(p, bufferSize); - } - }.resolve(this, absF); - } - - @Override - public FSDataOutputStream append(Path f, final int bufferSize, - final Progressable progress) throws IOException { - return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress); - } - - /** - * Append to an existing file (optional operation). - * - * @param f the existing file to be appended. - * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory - * to be present. - * @param bufferSize the size of the buffer to be used. - * @param progress for reporting progress if it is not null. - * @return Returns instance of {@link FSDataOutputStream} - * @throws IOException - */ - public FSDataOutputStream append(Path f, final EnumSet flag, - final int bufferSize, final Progressable progress) throws IOException { - statistics.incrementWriteOps(1); - Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public FSDataOutputStream doCall(final Path p) - throws IOException { - return dfs.append(getPathName(p), bufferSize, flag, progress, - statistics); - } - @Override - public FSDataOutputStream next(final FileSystem fs, final Path p) - throws IOException { - return fs.append(p, bufferSize); - } - }.resolve(this, absF); - } - - /** - * Append to an existing file (optional operation). - * - * @param f the existing file to be appended. - * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory - * to be present. - * @param bufferSize the size of the buffer to be used. - * @param progress for reporting progress if it is not null. - * @param favoredNodes Favored nodes for new blocks - * @return Returns instance of {@link FSDataOutputStream} - * @throws IOException - */ - public FSDataOutputStream append(Path f, final EnumSet flag, - final int bufferSize, final Progressable progress, - final InetSocketAddress[] favoredNodes) throws IOException { - statistics.incrementWriteOps(1); - Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public FSDataOutputStream doCall(final Path p) - throws IOException { - return dfs.append(getPathName(p), bufferSize, flag, progress, - statistics, favoredNodes); - } - @Override - public FSDataOutputStream next(final FileSystem fs, final Path p) - throws IOException { - return fs.append(p, bufferSize); - } - }.resolve(this, absF); - } - - @Override - public FSDataOutputStream create(Path f, FsPermission permission, - boolean overwrite, int bufferSize, short replication, long blockSize, - Progressable progress) throws IOException { - return this.create(f, permission, - overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) - : EnumSet.of(CreateFlag.CREATE), bufferSize, replication, - blockSize, progress, null); - } - - /** - * Same as - * {@link #create(Path, FsPermission, boolean, int, short, long, - * Progressable)} with the addition of favoredNodes that is a hint to - * where the namenode should place the file blocks. - * The favored nodes hint is not persisted in HDFS. Hence it may be honored - * at the creation time only. And with favored nodes, blocks will be pinned - * on the datanodes to prevent balancing move the block. HDFS could move the - * blocks during replication, to move the blocks from favored nodes. A value - * of null means no favored nodes for this create - */ - public HdfsDataOutputStream create(final Path f, - final FsPermission permission, final boolean overwrite, - final int bufferSize, final short replication, final long blockSize, - final Progressable progress, final InetSocketAddress[] favoredNodes) - throws IOException { - statistics.incrementWriteOps(1); - Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public HdfsDataOutputStream doCall(final Path p) - throws IOException, UnresolvedLinkException { - final DFSOutputStream out = dfs.create(getPathName(f), permission, - overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) - : EnumSet.of(CreateFlag.CREATE), - true, replication, blockSize, progress, bufferSize, null, - favoredNodes); - return dfs.createWrappedOutputStream(out, statistics); - } - @Override - public HdfsDataOutputStream next(final FileSystem fs, final Path p) - throws IOException { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem myDfs = (DistributedFileSystem)fs; - return myDfs.create(p, permission, overwrite, bufferSize, replication, - blockSize, progress, favoredNodes); - } - throw new UnsupportedOperationException("Cannot create with" + - " favoredNodes through a symlink to a non-DistributedFileSystem: " - + f + " -> " + p); - } - }.resolve(this, absF); - } - - @Override - public FSDataOutputStream create(final Path f, final FsPermission permission, - final EnumSet cflags, final int bufferSize, - final short replication, final long blockSize, final Progressable progress, - final ChecksumOpt checksumOpt) throws IOException { - statistics.incrementWriteOps(1); - Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public FSDataOutputStream doCall(final Path p) - throws IOException, UnresolvedLinkException { - final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, - cflags, replication, blockSize, progress, bufferSize, - checksumOpt); - return dfs.createWrappedOutputStream(dfsos, statistics); - } - @Override - public FSDataOutputStream next(final FileSystem fs, final Path p) - throws IOException { - return fs.create(p, permission, cflags, bufferSize, - replication, blockSize, progress, checksumOpt); - } - }.resolve(this, absF); - } - - @Override - protected HdfsDataOutputStream primitiveCreate(Path f, - FsPermission absolutePermission, EnumSet flag, int bufferSize, - short replication, long blockSize, Progressable progress, - ChecksumOpt checksumOpt) throws IOException { - statistics.incrementWriteOps(1); - final DFSOutputStream dfsos = dfs.primitiveCreate( - getPathName(fixRelativePart(f)), - absolutePermission, flag, true, replication, blockSize, - progress, bufferSize, checksumOpt); - return dfs.createWrappedOutputStream(dfsos, statistics); - } - - /** - * Same as create(), except fails if parent directory doesn't already exist. - */ - @Override - @SuppressWarnings("deprecation") - public FSDataOutputStream createNonRecursive(final Path f, - final FsPermission permission, final EnumSet flag, - final int bufferSize, final short replication, final long blockSize, - final Progressable progress) throws IOException { - statistics.incrementWriteOps(1); - if (flag.contains(CreateFlag.OVERWRITE)) { - flag.add(CreateFlag.CREATE); - } - Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public FSDataOutputStream doCall(final Path p) throws IOException, - UnresolvedLinkException { - final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, - flag, false, replication, blockSize, progress, bufferSize, null); - return dfs.createWrappedOutputStream(dfsos, statistics); - } - - @Override - public FSDataOutputStream next(final FileSystem fs, final Path p) - throws IOException { - return fs.createNonRecursive(p, permission, flag, bufferSize, - replication, blockSize, progress); - } - }.resolve(this, absF); - } - - @Override - public boolean setReplication(Path src, - final short replication - ) throws IOException { - statistics.incrementWriteOps(1); - Path absF = fixRelativePart(src); - return new FileSystemLinkResolver() { - @Override - public Boolean doCall(final Path p) - throws IOException, UnresolvedLinkException { - return dfs.setReplication(getPathName(p), replication); - } - @Override - public Boolean next(final FileSystem fs, final Path p) - throws IOException { - return fs.setReplication(p, replication); - } - }.resolve(this, absF); - } - - /** - * Set the source path to the specified storage policy. - * - * @param src The source path referring to either a directory or a file. - * @param policyName The name of the storage policy. - */ - @Override - public void setStoragePolicy(final Path src, final String policyName) - throws IOException { - statistics.incrementWriteOps(1); - Path absF = fixRelativePart(src); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { - dfs.setStoragePolicy(getPathName(p), policyName); - return null; - } - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException { - fs.setStoragePolicy(p, policyName); - return null; - } - }.resolve(this, absF); - } - - @Override - public BlockStoragePolicySpi getStoragePolicy(Path path) throws IOException { - statistics.incrementReadOps(1); - Path absF = fixRelativePart(path); - - return new FileSystemLinkResolver() { - @Override - public BlockStoragePolicySpi doCall(final Path p) throws IOException { - return getClient().getStoragePolicy(getPathName(p)); - } - - @Override - public BlockStoragePolicySpi next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { - return fs.getStoragePolicy(p); - } - }.resolve(this, absF); - } - - @Override - public Collection getAllStoragePolicies() - throws IOException { - return Arrays.asList(dfs.getStoragePolicies()); - } - - /** - * Deprecated. Prefer {@link FileSystem#getAllStoragePolicies()} - * @return - * @throws IOException - */ - @Deprecated - public BlockStoragePolicy[] getStoragePolicies() throws IOException { - statistics.incrementReadOps(1); - return dfs.getStoragePolicies(); - } - - /** - * Move blocks from srcs to trg and delete srcs afterwards. - * The file block sizes must be the same. - * - * @param trg existing file to append to - * @param psrcs list of files (same block size, same replication) - * @throws IOException - */ - @Override - public void concat(Path trg, Path [] psrcs) throws IOException { - statistics.incrementWriteOps(1); - // Make target absolute - Path absF = fixRelativePart(trg); - // Make all srcs absolute - Path[] srcs = new Path[psrcs.length]; - for (int i=0; i " + stat.getPath()); - } - absF = fixRelativePart(stat.getPath()); - for (int i=0; i " + stat.getPath()); - } - srcs[i] = fixRelativePart(stat.getPath()); - } - // Try concat again. Can still race with another symlink. - for (int i=0; i() { - @Override - public Boolean doCall(final Path p) - throws IOException, UnresolvedLinkException { - return dfs.rename(getPathName(source), getPathName(p)); - } - @Override - public Boolean next(final FileSystem fs, final Path p) - throws IOException { - // Should just throw an error in FileSystem#checkPath - return doCall(p); - } - }.resolve(this, absDst); - } - } - - /** - * This rename operation is guaranteed to be atomic. - */ - @SuppressWarnings("deprecation") - @Override - public void rename(Path src, Path dst, final Options.Rename... options) - throws IOException { - statistics.incrementWriteOps(1); - final Path absSrc = fixRelativePart(src); - final Path absDst = fixRelativePart(dst); - // Try the rename without resolving first - try { - dfs.rename(getPathName(absSrc), getPathName(absDst), options); - } catch (UnresolvedLinkException e) { - // Fully resolve the source - final Path source = getFileLinkStatus(absSrc).getPath(); - // Keep trying to resolve the destination - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { - dfs.rename(getPathName(source), getPathName(p), options); - return null; - } - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException { - // Should just throw an error in FileSystem#checkPath - return doCall(p); - } - }.resolve(this, absDst); - } - } - - @Override - public boolean truncate(Path f, final long newLength) throws IOException { - statistics.incrementWriteOps(1); - Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public Boolean doCall(final Path p) - throws IOException, UnresolvedLinkException { - return dfs.truncate(getPathName(p), newLength); - } - @Override - public Boolean next(final FileSystem fs, final Path p) - throws IOException { - return fs.truncate(p, newLength); - } - }.resolve(this, absF); - } - - @Override - public boolean delete(Path f, final boolean recursive) throws IOException { - statistics.incrementWriteOps(1); - Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public Boolean doCall(final Path p) - throws IOException, UnresolvedLinkException { - return dfs.delete(getPathName(p), recursive); - } - @Override - public Boolean next(final FileSystem fs, final Path p) - throws IOException { - return fs.delete(p, recursive); - } - }.resolve(this, absF); - } - - @Override - public ContentSummary getContentSummary(Path f) throws IOException { - statistics.incrementReadOps(1); - Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public ContentSummary doCall(final Path p) - throws IOException, UnresolvedLinkException { - return dfs.getContentSummary(getPathName(p)); - } - @Override - public ContentSummary next(final FileSystem fs, final Path p) - throws IOException { - return fs.getContentSummary(p); - } - }.resolve(this, absF); - } - - /** Set a directory's quotas - * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long, StorageType) - */ - public void setQuota(Path src, final long namespaceQuota, - final long storagespaceQuota) throws IOException { - Path absF = fixRelativePart(src); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { - dfs.setQuota(getPathName(p), namespaceQuota, storagespaceQuota); - return null; - } - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException { - // setQuota is not defined in FileSystem, so we only can resolve - // within this DFS - return doCall(p); - } - }.resolve(this, absF); - } - - /** - * Set the per type storage quota of a directory. - * - * @param src target directory whose quota is to be modified. - * @param type storage type of the specific storage type quota to be modified. - * @param quota value of the specific storage type quota to be modified. - * Maybe {@link HdfsConstants#QUOTA_RESET} to clear quota by storage type. - */ - public void setQuotaByStorageType( - Path src, final StorageType type, final long quota) - throws IOException { - Path absF = fixRelativePart(src); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { - dfs.setQuotaByStorageType(getPathName(p), type, quota); - return null; - } - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException { - // setQuotaByStorageType is not defined in FileSystem, so we only can resolve - // within this DFS - return doCall(p); - } - }.resolve(this, absF); - } - - private FileStatus[] listStatusInternal(Path p) throws IOException { - String src = getPathName(p); - - // fetch the first batch of entries in the directory - DirectoryListing thisListing = dfs.listPaths( - src, HdfsFileStatus.EMPTY_NAME); - - if (thisListing == null) { // the directory does not exist - throw new FileNotFoundException("File " + p + " does not exist."); - } - - HdfsFileStatus[] partialListing = thisListing.getPartialListing(); - if (!thisListing.hasMore()) { // got all entries of the directory - FileStatus[] stats = new FileStatus[partialListing.length]; - for (int i = 0; i < partialListing.length; i++) { - stats[i] = partialListing[i].makeQualified(getUri(), p); - } - statistics.incrementReadOps(1); - return stats; - } - - // The directory size is too big that it needs to fetch more - // estimate the total number of entries in the directory - int totalNumEntries = - partialListing.length + thisListing.getRemainingEntries(); - ArrayList listing = - new ArrayList(totalNumEntries); - // add the first batch of entries to the array list - for (HdfsFileStatus fileStatus : partialListing) { - listing.add(fileStatus.makeQualified(getUri(), p)); - } - statistics.incrementLargeReadOps(1); - - // now fetch more entries - do { - thisListing = dfs.listPaths(src, thisListing.getLastName()); - - if (thisListing == null) { // the directory is deleted - throw new FileNotFoundException("File " + p + " does not exist."); - } - - partialListing = thisListing.getPartialListing(); - for (HdfsFileStatus fileStatus : partialListing) { - listing.add(fileStatus.makeQualified(getUri(), p)); - } - statistics.incrementLargeReadOps(1); - } while (thisListing.hasMore()); - - return listing.toArray(new FileStatus[listing.size()]); - } - - /** - * List all the entries of a directory - * - * Note that this operation is not atomic for a large directory. - * The entries of a directory may be fetched from NameNode multiple times. - * It only guarantees that each name occurs once if a directory - * undergoes changes between the calls. - */ - @Override - public FileStatus[] listStatus(Path p) throws IOException { - Path absF = fixRelativePart(p); - return new FileSystemLinkResolver() { - @Override - public FileStatus[] doCall(final Path p) - throws IOException, UnresolvedLinkException { - return listStatusInternal(p); - } - @Override - public FileStatus[] next(final FileSystem fs, final Path p) - throws IOException { - return fs.listStatus(p); - } - }.resolve(this, absF); - } - - @Override - protected RemoteIterator listLocatedStatus(final Path p, - final PathFilter filter) - throws IOException { - Path absF = fixRelativePart(p); - return new FileSystemLinkResolver>() { - @Override - public RemoteIterator doCall(final Path p) - throws IOException, UnresolvedLinkException { - return new DirListingIterator(p, filter, true); - } - - @Override - public RemoteIterator next(final FileSystem fs, final Path p) - throws IOException { - if (fs instanceof DistributedFileSystem) { - return ((DistributedFileSystem)fs).listLocatedStatus(p, filter); - } - // symlink resolution for this methos does not work cross file systems - // because it is a protected method. - throw new IOException("Link resolution does not work with multiple " + - "file systems for listLocatedStatus(): " + p); - } - }.resolve(this, absF); - } - - - /** - * Returns a remote iterator so that followup calls are made on demand - * while consuming the entries. This reduces memory consumption during - * listing of a large directory. - * - * @param p target path - * @return remote iterator - */ - @Override - public RemoteIterator listStatusIterator(final Path p) - throws IOException { - Path absF = fixRelativePart(p); - return new FileSystemLinkResolver>() { - @Override - public RemoteIterator doCall(final Path p) - throws IOException, UnresolvedLinkException { - return new DirListingIterator(p, false); - } - - @Override - public RemoteIterator next(final FileSystem fs, final Path p) - throws IOException { - return ((DistributedFileSystem)fs).listStatusIterator(p); - } - }.resolve(this, absF); - - } - - /** - * This class defines an iterator that returns - * the file status of each file/subdirectory of a directory - * - * if needLocation, status contains block location if it is a file - * throws a RuntimeException with the error as its cause. - * - * @param the type of the file status - */ - private class DirListingIterator - implements RemoteIterator { - private DirectoryListing thisListing; - private int i; - private Path p; - private String src; - private T curStat = null; - private PathFilter filter; - private boolean needLocation; - - private DirListingIterator(Path p, PathFilter filter, - boolean needLocation) throws IOException { - this.p = p; - this.src = getPathName(p); - this.filter = filter; - this.needLocation = needLocation; - // fetch the first batch of entries in the directory - thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, - needLocation); - statistics.incrementReadOps(1); - if (thisListing == null) { // the directory does not exist - throw new FileNotFoundException("File " + p + " does not exist."); - } - i = 0; - } - - private DirListingIterator(Path p, boolean needLocation) - throws IOException { - this(p, null, needLocation); - } - - @Override - @SuppressWarnings("unchecked") - public boolean hasNext() throws IOException { - while (curStat == null && hasNextNoFilter()) { - T next; - HdfsFileStatus fileStat = thisListing.getPartialListing()[i++]; - if (needLocation) { - next = (T)((HdfsLocatedFileStatus)fileStat) - .makeQualifiedLocated(getUri(), p); - } else { - next = (T)fileStat.makeQualified(getUri(), p); - } - // apply filter if not null - if (filter == null || filter.accept(next.getPath())) { - curStat = next; - } - } - return curStat != null; - } - - /** Check if there is a next item before applying the given filter */ - private boolean hasNextNoFilter() throws IOException { - if (thisListing == null) { - return false; - } - if (i >= thisListing.getPartialListing().length - && thisListing.hasMore()) { - // current listing is exhausted & fetch a new listing - thisListing = dfs.listPaths(src, thisListing.getLastName(), - needLocation); - statistics.incrementReadOps(1); - if (thisListing == null) { - return false; - } - i = 0; - } - return (i < thisListing.getPartialListing().length); - } - - @Override - public T next() throws IOException { - if (hasNext()) { - T tmp = curStat; - curStat = null; - return tmp; - } - throw new java.util.NoSuchElementException("No more entry in " + p); - } - } - - /** - * Create a directory, only when the parent directories exist. - * - * See {@link FsPermission#applyUMask(FsPermission)} for details of how - * the permission is applied. - * - * @param f The path to create - * @param permission The permission. See FsPermission#applyUMask for - * details about how this is used to calculate the - * effective permission. - */ - public boolean mkdir(Path f, FsPermission permission) throws IOException { - return mkdirsInternal(f, permission, false); - } - - /** - * Create a directory and its parent directories. - * - * See {@link FsPermission#applyUMask(FsPermission)} for details of how - * the permission is applied. - * - * @param f The path to create - * @param permission The permission. See FsPermission#applyUMask for - * details about how this is used to calculate the - * effective permission. - */ - @Override - public boolean mkdirs(Path f, FsPermission permission) throws IOException { - return mkdirsInternal(f, permission, true); - } - - private boolean mkdirsInternal(Path f, final FsPermission permission, - final boolean createParent) throws IOException { - statistics.incrementWriteOps(1); - Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public Boolean doCall(final Path p) - throws IOException, UnresolvedLinkException { - return dfs.mkdirs(getPathName(p), permission, createParent); - } - - @Override - public Boolean next(final FileSystem fs, final Path p) - throws IOException { - // FileSystem doesn't have a non-recursive mkdir() method - // Best we can do is error out - if (!createParent) { - throw new IOException("FileSystem does not support non-recursive" - + "mkdir"); - } - return fs.mkdirs(p, permission); - } - }.resolve(this, absF); - } - - @SuppressWarnings("deprecation") - @Override - protected boolean primitiveMkdir(Path f, FsPermission absolutePermission) - throws IOException { - statistics.incrementWriteOps(1); - return dfs.primitiveMkdir(getPathName(f), absolutePermission); - } - - - @Override - public void close() throws IOException { - try { - dfs.closeOutputStreams(false); - super.close(); - } finally { - dfs.close(); - } - } - - @Override - public String toString() { - return "DFS[" + dfs + "]"; - } - - @InterfaceAudience.Private - @VisibleForTesting - public DFSClient getClient() { - return dfs; - } - - @Override - public FsStatus getStatus(Path p) throws IOException { - statistics.incrementReadOps(1); - return dfs.getDiskStatus(); - } - - /** - * Returns count of blocks with no good replicas left. Normally should be - * zero. - * - * @throws IOException - */ - public long getMissingBlocksCount() throws IOException { - return dfs.getMissingBlocksCount(); - } - - /** - * Returns count of blocks with replication factor 1 and have - * lost the only replica. - * - * @throws IOException - */ - public long getMissingReplOneBlocksCount() throws IOException { - return dfs.getMissingReplOneBlocksCount(); - } - - /** - * Returns count of blocks with one of more replica missing. - * - * @throws IOException - */ - public long getUnderReplicatedBlocksCount() throws IOException { - return dfs.getUnderReplicatedBlocksCount(); - } - - /** - * Returns count of blocks with at least one replica marked corrupt. - * - * @throws IOException - */ - public long getCorruptBlocksCount() throws IOException { - return dfs.getCorruptBlocksCount(); - } - - @Override - public RemoteIterator listCorruptFileBlocks(Path path) - throws IOException { - return new CorruptFileBlockIterator(dfs, path); - } - - /** @return datanode statistics. */ - public DatanodeInfo[] getDataNodeStats() throws IOException { - return getDataNodeStats(DatanodeReportType.ALL); - } - - /** @return datanode statistics for the given type. */ - public DatanodeInfo[] getDataNodeStats(final DatanodeReportType type - ) throws IOException { - return dfs.datanodeReport(type); - } - - /** - * Enter, leave or get safe mode. - * - * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode( - * HdfsConstants.SafeModeAction,boolean) - */ - public boolean setSafeMode(HdfsConstants.SafeModeAction action) - throws IOException { - return setSafeMode(action, false); - } - - /** - * Enter, leave or get safe mode. - * - * @param action - * One of SafeModeAction.ENTER, SafeModeAction.LEAVE and - * SafeModeAction.GET - * @param isChecked - * If true check only for Active NNs status, else check first NN's - * status - * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(SafeModeAction, boolean) - */ - public boolean setSafeMode(HdfsConstants.SafeModeAction action, - boolean isChecked) throws IOException { - return dfs.setSafeMode(action, isChecked); - } - - /** - * Save namespace image. - * - * @param timeWindow NameNode can ignore this command if the latest - * checkpoint was done within the given time period (in - * seconds). - * @return true if a new checkpoint has been made - * @see ClientProtocol#saveNamespace(long, long) - */ - public boolean saveNamespace(long timeWindow, long txGap) throws IOException { - return dfs.saveNamespace(timeWindow, txGap); - } - - /** - * Save namespace image. NameNode always does the checkpoint. - */ - public void saveNamespace() throws IOException { - saveNamespace(0, 0); - } - - /** - * Rolls the edit log on the active NameNode. - * Requires super-user privileges. - * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#rollEdits() - * @return the transaction ID of the newly created segment - */ - public long rollEdits() throws AccessControlException, IOException { - return dfs.rollEdits(); - } - - /** - * enable/disable/check restoreFaileStorage - * - * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#restoreFailedStorage(String arg) - */ - public boolean restoreFailedStorage(String arg) - throws AccessControlException, IOException { - return dfs.restoreFailedStorage(arg); - } - - - /** - * Refreshes the list of hosts and excluded hosts from the configured - * files. - */ - public void refreshNodes() throws IOException { - dfs.refreshNodes(); - } - - /** - * Finalize previously upgraded files system state. - * @throws IOException - */ - public void finalizeUpgrade() throws IOException { - dfs.finalizeUpgrade(); - } - - /** - * Rolling upgrade: prepare/finalize/query. - */ - public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) - throws IOException { - return dfs.rollingUpgrade(action); - } - - /* - * Requests the namenode to dump data strcutures into specified - * file. - */ - public void metaSave(String pathname) throws IOException { - dfs.metaSave(pathname); - } - - @Override - public FsServerDefaults getServerDefaults() throws IOException { - return dfs.getServerDefaults(); - } - - /** - * Returns the stat information about the file. - * @throws FileNotFoundException if the file does not exist. - */ - @Override - public FileStatus getFileStatus(Path f) throws IOException { - statistics.incrementReadOps(1); - Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public FileStatus doCall(final Path p) throws IOException, - UnresolvedLinkException { - HdfsFileStatus fi = dfs.getFileInfo(getPathName(p)); - if (fi != null) { - return fi.makeQualified(getUri(), p); - } else { - throw new FileNotFoundException("File does not exist: " + p); - } - } - @Override - public FileStatus next(final FileSystem fs, final Path p) - throws IOException { - return fs.getFileStatus(p); - } - }.resolve(this, absF); - } - - @SuppressWarnings("deprecation") - @Override - public void createSymlink(final Path target, final Path link, - final boolean createParent) throws AccessControlException, - FileAlreadyExistsException, FileNotFoundException, - ParentNotDirectoryException, UnsupportedFileSystemException, - IOException { - if (!FileSystem.areSymlinksEnabled()) { - throw new UnsupportedOperationException("Symlinks not supported"); - } - statistics.incrementWriteOps(1); - final Path absF = fixRelativePart(link); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) throws IOException, - UnresolvedLinkException { - dfs.createSymlink(target.toString(), getPathName(p), createParent); - return null; - } - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { - fs.createSymlink(target, p, createParent); - return null; - } - }.resolve(this, absF); - } - - @Override - public boolean supportsSymlinks() { - return true; - } - - @Override - public FileStatus getFileLinkStatus(final Path f) - throws AccessControlException, FileNotFoundException, - UnsupportedFileSystemException, IOException { - statistics.incrementReadOps(1); - final Path absF = fixRelativePart(f); - FileStatus status = new FileSystemLinkResolver() { - @Override - public FileStatus doCall(final Path p) throws IOException, - UnresolvedLinkException { - HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p)); - if (fi != null) { - return fi.makeQualified(getUri(), p); - } else { - throw new FileNotFoundException("File does not exist: " + p); - } - } - @Override - public FileStatus next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { - return fs.getFileLinkStatus(p); - } - }.resolve(this, absF); - // Fully-qualify the symlink - if (status.isSymlink()) { - Path targetQual = FSLinkResolver.qualifySymlinkTarget(this.getUri(), - status.getPath(), status.getSymlink()); - status.setSymlink(targetQual); - } - return status; - } - - @Override - public Path getLinkTarget(final Path f) throws AccessControlException, - FileNotFoundException, UnsupportedFileSystemException, IOException { - statistics.incrementReadOps(1); - final Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public Path doCall(final Path p) throws IOException, - UnresolvedLinkException { - HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p)); - if (fi != null) { - return fi.makeQualified(getUri(), p).getSymlink(); - } else { - throw new FileNotFoundException("File does not exist: " + p); - } - } - @Override - public Path next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { - return fs.getLinkTarget(p); - } - }.resolve(this, absF); - } - - @Override - protected Path resolveLink(Path f) throws IOException { - statistics.incrementReadOps(1); - String target = dfs.getLinkTarget(getPathName(fixRelativePart(f))); - if (target == null) { - throw new FileNotFoundException("File does not exist: " + f.toString()); - } - return new Path(target); - } - - @Override - public FileChecksum getFileChecksum(Path f) throws IOException { - statistics.incrementReadOps(1); - Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public FileChecksum doCall(final Path p) - throws IOException, UnresolvedLinkException { - return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE); - } - - @Override - public FileChecksum next(final FileSystem fs, final Path p) - throws IOException { - return fs.getFileChecksum(p); - } - }.resolve(this, absF); - } - - @Override - public FileChecksum getFileChecksum(Path f, final long length) - throws IOException { - statistics.incrementReadOps(1); - Path absF = fixRelativePart(f); - return new FileSystemLinkResolver() { - @Override - public FileChecksum doCall(final Path p) - throws IOException, UnresolvedLinkException { - return dfs.getFileChecksum(getPathName(p), length); - } - - @Override - public FileChecksum next(final FileSystem fs, final Path p) - throws IOException { - if (fs instanceof DistributedFileSystem) { - return ((DistributedFileSystem) fs).getFileChecksum(p, length); - } else { - throw new UnsupportedFileSystemException( - "getFileChecksum(Path, long) is not supported by " - + fs.getClass().getSimpleName()); - } - } - }.resolve(this, absF); - } - - @Override - public void setPermission(Path p, final FsPermission permission - ) throws IOException { - statistics.incrementWriteOps(1); - Path absF = fixRelativePart(p); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { - dfs.setPermission(getPathName(p), permission); - return null; - } - - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException { - fs.setPermission(p, permission); - return null; - } - }.resolve(this, absF); - } - - @Override - public void setOwner(Path p, final String username, final String groupname - ) throws IOException { - if (username == null && groupname == null) { - throw new IOException("username == null && groupname == null"); - } - statistics.incrementWriteOps(1); - Path absF = fixRelativePart(p); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { - dfs.setOwner(getPathName(p), username, groupname); - return null; - } - - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException { - fs.setOwner(p, username, groupname); - return null; - } - }.resolve(this, absF); - } - - @Override - public void setTimes(Path p, final long mtime, final long atime - ) throws IOException { - statistics.incrementWriteOps(1); - Path absF = fixRelativePart(p); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { - dfs.setTimes(getPathName(p), mtime, atime); - return null; - } - - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException { - fs.setTimes(p, mtime, atime); - return null; - } - }.resolve(this, absF); - } - - - @Override - protected int getDefaultPort() { - return HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT; - } - - @Override - public Token getDelegationToken(String renewer) - throws IOException { - Token result = - dfs.getDelegationToken(renewer == null ? null : new Text(renewer)); - return result; - } - - /** - * Requests the namenode to tell all datanodes to use a new, non-persistent - * bandwidth value for dfs.balance.bandwidthPerSec. - * The bandwidth parameter is the max number of bytes per second of network - * bandwidth to be used by a datanode during balancing. - * - * @param bandwidth Balancer bandwidth in bytes per second for all datanodes. - * @throws IOException - */ - public void setBalancerBandwidth(long bandwidth) throws IOException { - dfs.setBalancerBandwidth(bandwidth); - } - - /** - * Get a canonical service name for this file system. If the URI is logical, - * the hostname part of the URI will be returned. - * @return a service string that uniquely identifies this file system. - */ - @Override - public String getCanonicalServiceName() { - return dfs.getCanonicalServiceName(); - } - - @Override - protected URI canonicalizeUri(URI uri) { - if (HAUtilClient.isLogicalUri(getConf(), uri)) { - // Don't try to DNS-resolve logical URIs, since the 'authority' - // portion isn't a proper hostname - return uri; - } else { - return NetUtils.getCanonicalUri(uri, getDefaultPort()); - } - } - - /** - * Utility function that returns if the NameNode is in safemode or not. In HA - * mode, this API will return only ActiveNN's safemode status. - * - * @return true if NameNode is in safemode, false otherwise. - * @throws IOException - * when there is an issue communicating with the NameNode - */ - public boolean isInSafeMode() throws IOException { - return setSafeMode(SafeModeAction.SAFEMODE_GET, true); - } - - /** @see HdfsAdmin#allowSnapshot(Path) */ - public void allowSnapshot(final Path path) throws IOException { - Path absF = fixRelativePart(path); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { - dfs.allowSnapshot(getPathName(p)); - return null; - } - - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem myDfs = (DistributedFileSystem)fs; - myDfs.allowSnapshot(p); - } else { - throw new UnsupportedOperationException("Cannot perform snapshot" - + " operations on a symlink to a non-DistributedFileSystem: " - + path + " -> " + p); - } - return null; - } - }.resolve(this, absF); - } - - /** @see HdfsAdmin#disallowSnapshot(Path) */ - public void disallowSnapshot(final Path path) throws IOException { - Path absF = fixRelativePart(path); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { - dfs.disallowSnapshot(getPathName(p)); - return null; - } - - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem myDfs = (DistributedFileSystem)fs; - myDfs.disallowSnapshot(p); - } else { - throw new UnsupportedOperationException("Cannot perform snapshot" - + " operations on a symlink to a non-DistributedFileSystem: " - + path + " -> " + p); - } - return null; - } - }.resolve(this, absF); - } - - @Override - public Path createSnapshot(final Path path, final String snapshotName) - throws IOException { - Path absF = fixRelativePart(path); - return new FileSystemLinkResolver() { - @Override - public Path doCall(final Path p) - throws IOException, UnresolvedLinkException { - return new Path(dfs.createSnapshot(getPathName(p), snapshotName)); - } - - @Override - public Path next(final FileSystem fs, final Path p) - throws IOException { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem myDfs = (DistributedFileSystem)fs; - return myDfs.createSnapshot(p); - } else { - throw new UnsupportedOperationException("Cannot perform snapshot" - + " operations on a symlink to a non-DistributedFileSystem: " - + path + " -> " + p); - } - } - }.resolve(this, absF); - } - - @Override - public void renameSnapshot(final Path path, final String snapshotOldName, - final String snapshotNewName) throws IOException { - Path absF = fixRelativePart(path); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { - dfs.renameSnapshot(getPathName(p), snapshotOldName, snapshotNewName); - return null; - } - - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem myDfs = (DistributedFileSystem)fs; - myDfs.renameSnapshot(p, snapshotOldName, snapshotNewName); - } else { - throw new UnsupportedOperationException("Cannot perform snapshot" - + " operations on a symlink to a non-DistributedFileSystem: " - + path + " -> " + p); - } - return null; - } - }.resolve(this, absF); - } - - /** - * @return All the snapshottable directories - * @throws IOException - */ - public SnapshottableDirectoryStatus[] getSnapshottableDirListing() - throws IOException { - return dfs.getSnapshottableDirListing(); - } - - @Override - public void deleteSnapshot(final Path snapshotDir, final String snapshotName) - throws IOException { - Path absF = fixRelativePart(snapshotDir); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) - throws IOException, UnresolvedLinkException { - dfs.deleteSnapshot(getPathName(p), snapshotName); - return null; - } - - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem myDfs = (DistributedFileSystem)fs; - myDfs.deleteSnapshot(p, snapshotName); - } else { - throw new UnsupportedOperationException("Cannot perform snapshot" - + " operations on a symlink to a non-DistributedFileSystem: " - + snapshotDir + " -> " + p); - } - return null; - } - }.resolve(this, absF); - } - - /** - * Get the difference between two snapshots, or between a snapshot and the - * current tree of a directory. - * - * @see DFSClient#getSnapshotDiffReport(String, String, String) - */ - public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir, - final String fromSnapshot, final String toSnapshot) throws IOException { - Path absF = fixRelativePart(snapshotDir); - return new FileSystemLinkResolver() { - @Override - public SnapshotDiffReport doCall(final Path p) - throws IOException, UnresolvedLinkException { - return dfs.getSnapshotDiffReport(getPathName(p), fromSnapshot, - toSnapshot); - } - - @Override - public SnapshotDiffReport next(final FileSystem fs, final Path p) - throws IOException { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem myDfs = (DistributedFileSystem)fs; - myDfs.getSnapshotDiffReport(p, fromSnapshot, toSnapshot); - } else { - throw new UnsupportedOperationException("Cannot perform snapshot" - + " operations on a symlink to a non-DistributedFileSystem: " - + snapshotDir + " -> " + p); - } - return null; - } - }.resolve(this, absF); - } - - /** - * Get the close status of a file - * @param src The path to the file - * - * @return return true if file is closed - * @throws FileNotFoundException if the file does not exist. - * @throws IOException If an I/O error occurred - */ - public boolean isFileClosed(final Path src) throws IOException { - Path absF = fixRelativePart(src); - return new FileSystemLinkResolver() { - @Override - public Boolean doCall(final Path p) - throws IOException, UnresolvedLinkException { - return dfs.isFileClosed(getPathName(p)); - } - - @Override - public Boolean next(final FileSystem fs, final Path p) - throws IOException { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem myDfs = (DistributedFileSystem)fs; - return myDfs.isFileClosed(p); - } else { - throw new UnsupportedOperationException("Cannot call isFileClosed" - + " on a symlink to a non-DistributedFileSystem: " - + src + " -> " + p); - } - } - }.resolve(this, absF); - } - - /** - * @see {@link #addCacheDirective(CacheDirectiveInfo, EnumSet)} - */ - public long addCacheDirective(CacheDirectiveInfo info) throws IOException { - return addCacheDirective(info, EnumSet.noneOf(CacheFlag.class)); - } - - /** - * Add a new CacheDirective. - * - * @param info Information about a directive to add. - * @param flags {@link CacheFlag}s to use for this operation. - * @return the ID of the directive that was created. - * @throws IOException if the directive could not be added - */ - public long addCacheDirective( - CacheDirectiveInfo info, EnumSet flags) throws IOException { - Preconditions.checkNotNull(info.getPath()); - Path path = new Path(getPathName(fixRelativePart(info.getPath()))). - makeQualified(getUri(), getWorkingDirectory()); - return dfs.addCacheDirective( - new CacheDirectiveInfo.Builder(info). - setPath(path). - build(), - flags); - } - - /** - * @see {@link #modifyCacheDirective(CacheDirectiveInfo, EnumSet)} - */ - public void modifyCacheDirective(CacheDirectiveInfo info) throws IOException { - modifyCacheDirective(info, EnumSet.noneOf(CacheFlag.class)); - } - - /** - * Modify a CacheDirective. - * - * @param info Information about the directive to modify. You must set the ID - * to indicate which CacheDirective you want to modify. - * @param flags {@link CacheFlag}s to use for this operation. - * @throws IOException if the directive could not be modified - */ - public void modifyCacheDirective( - CacheDirectiveInfo info, EnumSet flags) throws IOException { - if (info.getPath() != null) { - info = new CacheDirectiveInfo.Builder(info). - setPath(new Path(getPathName(fixRelativePart(info.getPath()))). - makeQualified(getUri(), getWorkingDirectory())).build(); - } - dfs.modifyCacheDirective(info, flags); - } - - /** - * Remove a CacheDirectiveInfo. - * - * @param id identifier of the CacheDirectiveInfo to remove - * @throws IOException if the directive could not be removed - */ - public void removeCacheDirective(long id) - throws IOException { - dfs.removeCacheDirective(id); - } - - /** - * List cache directives. Incrementally fetches results from the server. - * - * @param filter Filter parameters to use when listing the directives, null to - * list all directives visible to us. - * @return A RemoteIterator which returns CacheDirectiveInfo objects. - */ - public RemoteIterator listCacheDirectives( - CacheDirectiveInfo filter) throws IOException { - if (filter == null) { - filter = new CacheDirectiveInfo.Builder().build(); - } - if (filter.getPath() != null) { - filter = new CacheDirectiveInfo.Builder(filter). - setPath(new Path(getPathName(fixRelativePart(filter.getPath())))). - build(); - } - final RemoteIterator iter = - dfs.listCacheDirectives(filter); - return new RemoteIterator() { - @Override - public boolean hasNext() throws IOException { - return iter.hasNext(); - } - - @Override - public CacheDirectiveEntry next() throws IOException { - // Although the paths we get back from the NameNode should always be - // absolute, we call makeQualified to add the scheme and authority of - // this DistributedFilesystem. - CacheDirectiveEntry desc = iter.next(); - CacheDirectiveInfo info = desc.getInfo(); - Path p = info.getPath().makeQualified(getUri(), getWorkingDirectory()); - return new CacheDirectiveEntry( - new CacheDirectiveInfo.Builder(info).setPath(p).build(), - desc.getStats()); - } - }; - } - - /** - * Add a cache pool. - * - * @param info - * The request to add a cache pool. - * @throws IOException - * If the request could not be completed. - */ - public void addCachePool(CachePoolInfo info) throws IOException { - CachePoolInfo.validate(info); - dfs.addCachePool(info); - } - - /** - * Modify an existing cache pool. - * - * @param info - * The request to modify a cache pool. - * @throws IOException - * If the request could not be completed. - */ - public void modifyCachePool(CachePoolInfo info) throws IOException { - CachePoolInfo.validate(info); - dfs.modifyCachePool(info); - } - - /** - * Remove a cache pool. - * - * @param poolName - * Name of the cache pool to remove. - * @throws IOException - * if the cache pool did not exist, or could not be removed. - */ - public void removeCachePool(String poolName) throws IOException { - CachePoolInfo.validateName(poolName); - dfs.removeCachePool(poolName); - } - - /** - * List all cache pools. - * - * @return A remote iterator from which you can get CachePoolEntry objects. - * Requests will be made as needed. - * @throws IOException - * If there was an error listing cache pools. - */ - public RemoteIterator listCachePools() throws IOException { - return dfs.listCachePools(); - } - - /** - * {@inheritDoc} - */ - @Override - public void modifyAclEntries(Path path, final List aclSpec) - throws IOException { - Path absF = fixRelativePart(path); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) throws IOException { - dfs.modifyAclEntries(getPathName(p), aclSpec); - return null; - } - - @Override - public Void next(final FileSystem fs, final Path p) throws IOException { - fs.modifyAclEntries(p, aclSpec); - return null; - } - }.resolve(this, absF); - } - - /** - * {@inheritDoc} - */ - @Override - public void removeAclEntries(Path path, final List aclSpec) - throws IOException { - Path absF = fixRelativePart(path); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) throws IOException { - dfs.removeAclEntries(getPathName(p), aclSpec); - return null; - } - - @Override - public Void next(final FileSystem fs, final Path p) throws IOException { - fs.removeAclEntries(p, aclSpec); - return null; - } - }.resolve(this, absF); - } - - /** - * {@inheritDoc} - */ - @Override - public void removeDefaultAcl(Path path) throws IOException { - final Path absF = fixRelativePart(path); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) throws IOException { - dfs.removeDefaultAcl(getPathName(p)); - return null; - } - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { - fs.removeDefaultAcl(p); - return null; - } - }.resolve(this, absF); - } - - /** - * {@inheritDoc} - */ - @Override - public void removeAcl(Path path) throws IOException { - final Path absF = fixRelativePart(path); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) throws IOException { - dfs.removeAcl(getPathName(p)); - return null; - } - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { - fs.removeAcl(p); - return null; - } - }.resolve(this, absF); - } - - /** - * {@inheritDoc} - */ - @Override - public void setAcl(Path path, final List aclSpec) throws IOException { - Path absF = fixRelativePart(path); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) throws IOException { - dfs.setAcl(getPathName(p), aclSpec); - return null; - } - - @Override - public Void next(final FileSystem fs, final Path p) throws IOException { - fs.setAcl(p, aclSpec); - return null; - } - }.resolve(this, absF); - } - - /** - * {@inheritDoc} - */ - @Override - public AclStatus getAclStatus(Path path) throws IOException { - final Path absF = fixRelativePart(path); - return new FileSystemLinkResolver() { - @Override - public AclStatus doCall(final Path p) throws IOException { - return dfs.getAclStatus(getPathName(p)); - } - @Override - public AclStatus next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { - return fs.getAclStatus(p); - } - }.resolve(this, absF); - } - - /* HDFS only */ - public void createEncryptionZone(final Path path, final String keyName) - throws IOException { - Path absF = fixRelativePart(path); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) throws IOException, - UnresolvedLinkException { - dfs.createEncryptionZone(getPathName(p), keyName); - return null; - } - - @Override - public Void next(final FileSystem fs, final Path p) throws IOException { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem myDfs = (DistributedFileSystem) fs; - myDfs.createEncryptionZone(p, keyName); - return null; - } else { - throw new UnsupportedOperationException( - "Cannot call createEncryptionZone" - + " on a symlink to a non-DistributedFileSystem: " + path - + " -> " + p); - } - } - }.resolve(this, absF); - } - - /* HDFS only */ - public EncryptionZone getEZForPath(final Path path) - throws IOException { - Preconditions.checkNotNull(path); - Path absF = fixRelativePart(path); - return new FileSystemLinkResolver() { - @Override - public EncryptionZone doCall(final Path p) throws IOException, - UnresolvedLinkException { - return dfs.getEZForPath(getPathName(p)); - } - - @Override - public EncryptionZone next(final FileSystem fs, final Path p) - throws IOException { - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem myDfs = (DistributedFileSystem) fs; - return myDfs.getEZForPath(p); - } else { - throw new UnsupportedOperationException( - "Cannot call getEZForPath" - + " on a symlink to a non-DistributedFileSystem: " + path - + " -> " + p); - } - } - }.resolve(this, absF); - } - - /* HDFS only */ - public RemoteIterator listEncryptionZones() - throws IOException { - return dfs.listEncryptionZones(); - } - - @Override - public void setXAttr(Path path, final String name, final byte[] value, - final EnumSet flag) throws IOException { - Path absF = fixRelativePart(path); - new FileSystemLinkResolver() { - - @Override - public Void doCall(final Path p) throws IOException { - dfs.setXAttr(getPathName(p), name, value, flag); - return null; - } - - @Override - public Void next(final FileSystem fs, final Path p) throws IOException { - fs.setXAttr(p, name, value, flag); - return null; - } - }.resolve(this, absF); - } - - @Override - public byte[] getXAttr(Path path, final String name) throws IOException { - final Path absF = fixRelativePart(path); - return new FileSystemLinkResolver() { - @Override - public byte[] doCall(final Path p) throws IOException { - return dfs.getXAttr(getPathName(p), name); - } - @Override - public byte[] next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { - return fs.getXAttr(p, name); - } - }.resolve(this, absF); - } - - @Override - public Map getXAttrs(Path path) throws IOException { - final Path absF = fixRelativePart(path); - return new FileSystemLinkResolver>() { - @Override - public Map doCall(final Path p) throws IOException { - return dfs.getXAttrs(getPathName(p)); - } - @Override - public Map next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { - return fs.getXAttrs(p); - } - }.resolve(this, absF); - } - - @Override - public Map getXAttrs(Path path, final List names) - throws IOException { - final Path absF = fixRelativePart(path); - return new FileSystemLinkResolver>() { - @Override - public Map doCall(final Path p) throws IOException { - return dfs.getXAttrs(getPathName(p), names); - } - @Override - public Map next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { - return fs.getXAttrs(p, names); - } - }.resolve(this, absF); - } - - @Override - public List listXAttrs(Path path) - throws IOException { - final Path absF = fixRelativePart(path); - return new FileSystemLinkResolver>() { - @Override - public List doCall(final Path p) throws IOException { - return dfs.listXAttrs(getPathName(p)); - } - @Override - public List next(final FileSystem fs, final Path p) - throws IOException, UnresolvedLinkException { - return fs.listXAttrs(p); - } - }.resolve(this, absF); - } - - @Override - public void removeXAttr(Path path, final String name) throws IOException { - Path absF = fixRelativePart(path); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) throws IOException { - dfs.removeXAttr(getPathName(p), name); - return null; - } - - @Override - public Void next(final FileSystem fs, final Path p) throws IOException { - fs.removeXAttr(p, name); - return null; - } - }.resolve(this, absF); - } - - @Override - public void access(Path path, final FsAction mode) throws IOException { - final Path absF = fixRelativePart(path); - new FileSystemLinkResolver() { - @Override - public Void doCall(final Path p) throws IOException { - dfs.checkAccess(getPathName(p), mode); - return null; - } - - @Override - public Void next(final FileSystem fs, final Path p) - throws IOException { - fs.access(p, mode); - return null; - } - }.resolve(this, absF); - } - - @Override - public Token[] addDelegationTokens( - final String renewer, Credentials credentials) throws IOException { - Token[] tokens = super.addDelegationTokens(renewer, credentials); - if (dfs.isHDFSEncryptionEnabled()) { - KeyProviderDelegationTokenExtension keyProviderDelegationTokenExtension = - KeyProviderDelegationTokenExtension. - createKeyProviderDelegationTokenExtension(dfs.getKeyProvider()); - Token[] kpTokens = keyProviderDelegationTokenExtension. - addDelegationTokens(renewer, credentials); - if (tokens != null && kpTokens != null) { - Token[] all = new Token[tokens.length + kpTokens.length]; - System.arraycopy(tokens, 0, all, 0, tokens.length); - System.arraycopy(kpTokens, 0, all, tokens.length, kpTokens.length); - tokens = all; - } else { - tokens = (tokens != null) ? tokens : kpTokens; - } - } - return tokens; - } - - public DFSInotifyEventInputStream getInotifyEventStream() throws IOException { - return dfs.getInotifyEventStream(); - } - - public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid) - throws IOException { - return dfs.getInotifyEventStream(lastReadTxid); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c030c6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java deleted file mode 100644 index 77bed1a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.client.impl; - -import java.io.IOException; -import java.util.NoSuchElementException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; - -/** - * Provides an iterator interface for listCorruptFileBlocks. - * This class is used by DistributedFileSystem and Hdfs. - */ -@InterfaceAudience.Private -public class CorruptFileBlockIterator implements RemoteIterator { - private final DFSClient dfs; - private final String path; - - private String[] files = null; - private int fileIdx = 0; - private String cookie = null; - private Path nextPath = null; - - private int callsMade = 0; - - public CorruptFileBlockIterator(DFSClient dfs, Path path) throws IOException { - this.dfs = dfs; - this.path = path2String(path); - loadNext(); - } - - /** - * @return the number of calls made to the DFSClient. - * This is for debugging and testing purposes. - */ - public int getCallsMade() { - return callsMade; - } - - private String path2String(Path path) { - return path.toUri().getPath(); - } - - private Path string2Path(String string) { - return new Path(string); - } - - private void loadNext() throws IOException { - if (files == null || fileIdx >= files.length) { - CorruptFileBlocks cfb = dfs.listCorruptFileBlocks(path, cookie); - files = cfb.getFiles(); - cookie = cfb.getCookie(); - fileIdx = 0; - callsMade++; - } - - if (fileIdx >= files.length) { - // received an empty response - // there are no more corrupt file blocks - nextPath = null; - } else { - nextPath = string2Path(files[fileIdx]); - fileIdx++; - } - } - - - @Override - public boolean hasNext() { - return nextPath != null; - } - - - @Override - public Path next() throws IOException { - if (!hasNext()) { - throw new NoSuchElementException("No more corrupt file blocks"); - } - - Path result = nextPath; - loadNext(); - - return result; - } -} \ No newline at end of file