Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 42E8A200BDA for ; Mon, 28 Nov 2016 22:16:02 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 42038160B25; Mon, 28 Nov 2016 21:16:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 921FC160B0D for ; Mon, 28 Nov 2016 22:16:00 +0100 (CET) Received: (qmail 47672 invoked by uid 500); 28 Nov 2016 21:15:51 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 46501 invoked by uid 99); 28 Nov 2016 21:15:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Nov 2016 21:15:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 95B31F17E3; Mon, 28 Nov 2016 21:15:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Mon, 28 Nov 2016 21:16:15 -0000 Message-Id: <1f87559822704450b3f2d6ef23f60574@git.apache.org> In-Reply-To: <3022f5161bb4468c92a2825216e9111f@git.apache.org> References: <3022f5161bb4468c92a2825216e9111f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [27/63] [abbrv] hadoop git commit: YARN-5461. Initial code ported from slider-core module. (jianhe) archived-at: Mon, 28 Nov 2016 21:16:02 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/a379904d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java new file mode 100644 index 0000000..aa5edf1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/CoreFileSystem.java @@ -0,0 +1,915 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.common.tools; + +import com.google.common.base.Preconditions; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; +import org.apache.slider.common.SliderExitCodes; +import org.apache.slider.common.SliderKeys; +import org.apache.slider.common.SliderXmlConfKeys; +import org.apache.slider.core.exceptions.BadClusterStateException; +import org.apache.slider.core.exceptions.ErrorStrings; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.core.exceptions.UnknownApplicationInstanceException; +import org.apache.slider.core.persist.Filenames; +import org.apache.slider.core.persist.InstancePaths; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; +import java.util.Enumeration; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; + +import static org.apache.slider.common.SliderXmlConfKeys.CLUSTER_DIRECTORY_PERMISSIONS; +import static org.apache.slider.common.SliderXmlConfKeys.DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS; + +public class CoreFileSystem { + private static final Logger + log = LoggerFactory.getLogger(CoreFileSystem.class); + + private static final String UTF_8 = "UTF-8"; + + protected final FileSystem fileSystem; + protected final Configuration configuration; + + public CoreFileSystem(FileSystem fileSystem, Configuration configuration) { + Preconditions.checkNotNull(fileSystem, + "Cannot create a CoreFileSystem with a null FileSystem"); + Preconditions.checkNotNull(configuration, + "Cannot create a CoreFileSystem with a null Configuration"); + this.fileSystem = fileSystem; + this.configuration = configuration; + } + + public CoreFileSystem(Configuration configuration) throws IOException { + Preconditions.checkNotNull(configuration, + "Cannot create a CoreFileSystem with a null Configuration"); + this.fileSystem = FileSystem.get(configuration); + this.configuration = fileSystem.getConf(); + } + + /** + * Get the temp path for this cluster + * @param clustername name of the cluster + * @return path for temp files (is not purged) + */ + public Path getTempPathForCluster(String clustername) { + Path clusterDir = buildClusterDirPath(clustername); + return new Path(clusterDir, SliderKeys.TMP_DIR_PREFIX); + } + + /** + * Returns the underlying FileSystem for this object. + * + * @return filesystem + */ + public FileSystem getFileSystem() { + return fileSystem; + } + + @Override + public String toString() { + final StringBuilder sb = + new StringBuilder("CoreFileSystem{"); + sb.append("fileSystem=").append(fileSystem.getUri()); + sb.append('}'); + return sb.toString(); + } + + /** + * Build up the path string for a cluster instance -no attempt to + * create the directory is made + * + * @param clustername name of the cluster + * @return the path for persistent data + */ + public Path buildClusterDirPath(String clustername) { + Preconditions.checkNotNull(clustername); + Path path = getBaseApplicationPath(); + return new Path(path, SliderKeys.CLUSTER_DIRECTORY + "/" + clustername); + } + + /** + * Build up the path string for app def folder -no attempt to + * create the directory is made + * + * @param clustername name of the cluster + * @return the path for persistent data + */ + public Path buildAppDefDirPath(String clustername) { + Path path = buildClusterDirPath(clustername); + return new Path(path, SliderKeys.APP_DEF_DIR); + } + + /** + * Build up the path string for addon folder -no attempt to + * create the directory is made + * + * @param clustername name of the cluster + * @return the path for persistent data + */ + public Path buildAddonDirPath(String clustername, String addonId) { + Preconditions.checkNotNull(addonId); + Path path = buildClusterDirPath(clustername); + return new Path(path, SliderKeys.ADDONS_DIR + "/" + addonId); + } + + /** + * Build up the path string for package install location -no attempt to + * create the directory is made + * + * @return the path for persistent app package + */ + public Path buildPackageDirPath(String packageName, String packageVersion) { + Preconditions.checkNotNull(packageName); + Path path = getBaseApplicationPath(); + path = new Path(path, SliderKeys.PACKAGE_DIRECTORY + "/" + packageName); + if (SliderUtils.isSet(packageVersion)) { + path = new Path(path, packageVersion); + } + return path; + } + + /** + * Build up the path string for package install location -no attempt to + * create the directory is made + * + * @return the path for persistent app package + */ + public Path buildClusterSecurityDirPath(String clusterName) { + Preconditions.checkNotNull(clusterName); + Path path = buildClusterDirPath(clusterName); + return new Path(path, SliderKeys.SECURITY_DIR); + } + + /** + * Build up the path string for keytab install location -no attempt to + * create the directory is made + * + * @return the path for keytab + */ + public Path buildKeytabInstallationDirPath(String keytabFolder) { + Preconditions.checkNotNull(keytabFolder); + Path path = getBaseApplicationPath(); + return new Path(path, SliderKeys.KEYTAB_DIR + "/" + keytabFolder); + } + + /** + * Build up the path string for keytab install location -no attempt to + * create the directory is made + * + * @return the path for keytab installation location + */ + public Path buildKeytabPath(String keytabDir, String keytabName, String clusterName) { + Path homePath = getHomeDirectory(); + Path baseKeytabDir; + if (keytabDir != null) { + baseKeytabDir = new Path(homePath, keytabDir); + } else { + baseKeytabDir = new Path(buildClusterDirPath(clusterName), + SliderKeys.KEYTAB_DIR); + } + return keytabName == null ? baseKeytabDir : + new Path(baseKeytabDir, keytabName); + } + + /** + * Build up the path string for resource install location -no attempt to + * create the directory is made + * + * @return the path for resource + */ + public Path buildResourcePath(String resourceFolder) { + Preconditions.checkNotNull(resourceFolder); + Path path = getBaseApplicationPath(); + return new Path(path, SliderKeys.RESOURCE_DIR + "/" + resourceFolder); + } + + /** + * Build up the path string for resource install location -no attempt to + * create the directory is made + * + * @return the path for resource + */ + public Path buildResourcePath(String dirName, String fileName) { + Preconditions.checkNotNull(dirName); + Preconditions.checkNotNull(fileName); + Path path = getBaseApplicationPath(); + return new Path(path, SliderKeys.RESOURCE_DIR + "/" + dirName + "/" + fileName); + } + + /** + * Build up the path string for cluster resource install location -no + * attempt to create the directory is made + * + * @return the path for resource + */ + public Path buildClusterResourcePath(String clusterName, String component) { + Preconditions.checkNotNull(clusterName); + Path path = buildClusterDirPath(clusterName); + return new Path(path, SliderKeys.RESOURCE_DIR + "/" + component); + } + + /** + * Build up the path string for cluster resource install location -no + * attempt to create the directory is made + * + * @return the path for resource + */ + public Path buildClusterResourcePath(String clusterName) { + Preconditions.checkNotNull(clusterName); + Path path = buildClusterDirPath(clusterName); + return new Path(path, SliderKeys.RESOURCE_DIR); + } + + /** + * Create the Slider cluster path for a named cluster and all its subdirs + * This is a directory; a mkdirs() operation is executed + * to ensure that it is there. + * + * @param clustername name of the cluster + * @return the path to the cluster directory + * @throws java.io.IOException trouble + * @throws SliderException slider-specific exceptions + */ + public Path createClusterDirectories(String clustername, Configuration conf) + throws IOException, SliderException { + + + Path clusterDirectory = buildClusterDirPath(clustername); + InstancePaths instancePaths = new InstancePaths(clusterDirectory); + createClusterDirectories(instancePaths); + return clusterDirectory; + } + + /** + * Create the Slider cluster path for a named cluster and all its subdirs + * This is a directory; a mkdirs() operation is executed + * to ensure that it is there. + * + * @param instancePaths instance paths + * @throws IOException trouble + * @throws SliderException slider-specific exceptions + */ + public void createClusterDirectories(InstancePaths instancePaths) throws + IOException, SliderException { + Path instanceDir = instancePaths.instanceDir; + + verifyDirectoryNonexistent(instanceDir); + FsPermission clusterPerms = getInstanceDirectoryPermissions(); + createWithPermissions(instanceDir, clusterPerms); + createWithPermissions(instancePaths.snapshotConfPath, clusterPerms); + createWithPermissions(instancePaths.generatedConfPath, clusterPerms); + createWithPermissions(instancePaths.historyPath, clusterPerms); + createWithPermissions(instancePaths.tmpPathAM, clusterPerms); + + // Data Directory + String dataOpts = + configuration.get(SliderXmlConfKeys.DATA_DIRECTORY_PERMISSIONS, + SliderXmlConfKeys.DEFAULT_DATA_DIRECTORY_PERMISSIONS); + log.debug("Setting data directory permissions to {}", dataOpts); + createWithPermissions(instancePaths.dataPath, new FsPermission(dataOpts)); + + } + + /** + * Create a directory with the given permissions. + * + * @param dir directory + * @param clusterPerms cluster permissions + * @throws IOException IO problem + * @throws BadClusterStateException any cluster state problem + */ + public void createWithPermissions(Path dir, FsPermission clusterPerms) throws + IOException, + BadClusterStateException { + if (fileSystem.isFile(dir)) { + // HADOOP-9361 shows some filesystems don't correctly fail here + throw new BadClusterStateException( + "Cannot create a directory over a file %s", dir); + } + log.debug("mkdir {} with perms {}", dir, clusterPerms); + //no mask whatoever + fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000"); + fileSystem.mkdirs(dir, clusterPerms); + //and force set it anyway just to make sure + fileSystem.setPermission(dir, clusterPerms); + } + + /** + * Get the permissions of a path + * + * @param path path to check + * @return the permissions + * @throws IOException any IO problem (including file not found) + */ + public FsPermission getPathPermissions(Path path) throws IOException { + FileStatus status = fileSystem.getFileStatus(path); + return status.getPermission(); + } + + public FsPermission getInstanceDirectoryPermissions() { + String clusterDirPermsOct = + configuration.get(CLUSTER_DIRECTORY_PERMISSIONS, + DEFAULT_CLUSTER_DIRECTORY_PERMISSIONS); + return new FsPermission(clusterDirPermsOct); + } + + /** + * Verify that the cluster directory is not present + * + * @param clustername name of the cluster + * @param clusterDirectory actual directory to look for + * @throws IOException trouble with FS + * @throws SliderException If the directory exists + */ + public void verifyClusterDirectoryNonexistent(String clustername, + Path clusterDirectory) + throws IOException, SliderException { + if (fileSystem.exists(clusterDirectory)) { + throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS, + ErrorStrings.PRINTF_E_INSTANCE_ALREADY_EXISTS, clustername, + clusterDirectory); + } + } + /** + * Verify that the given directory is not present + * + * @param clusterDirectory actual directory to look for + * @throws IOException trouble with FS + * @throws SliderException If the directory exists + */ + public void verifyDirectoryNonexistent(Path clusterDirectory) throws + IOException, + SliderException { + if (fileSystem.exists(clusterDirectory)) { + + log.error("Dir {} exists: {}", + clusterDirectory, + listFSDir(clusterDirectory)); + throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS, + ErrorStrings.PRINTF_E_INSTANCE_DIR_ALREADY_EXISTS, + clusterDirectory); + } + } + + /** + * Verify that a user has write access to a directory. + * It does this by creating then deleting a temp file + * + * @param dirPath actual directory to look for + * @throws FileNotFoundException file not found + * @throws IOException trouble with FS + * @throws BadClusterStateException if the directory is not writeable + */ + public void verifyDirectoryWriteAccess(Path dirPath) throws IOException, + SliderException { + verifyPathExists(dirPath); + Path tempFile = new Path(dirPath, "tmp-file-for-checks"); + try { + FSDataOutputStream out ; + out = fileSystem.create(tempFile, true); + IOUtils.closeStream(out); + fileSystem.delete(tempFile, false); + } catch (IOException e) { + log.warn("Failed to create file {}: {}", tempFile, e); + throw new BadClusterStateException(e, + "Unable to write to directory %s : %s", dirPath, e.toString()); + } + } + + /** + * Verify that a path exists + * @param path path to check + * @throws FileNotFoundException file not found + * @throws IOException trouble with FS + */ + public void verifyPathExists(Path path) throws IOException { + if (!fileSystem.exists(path)) { + throw new FileNotFoundException(path.toString()); + } + } + + /** + * Verify that a path exists + * @param path path to check + * @throws FileNotFoundException file not found or is not a file + * @throws IOException trouble with FS + */ + public void verifyFileExists(Path path) throws IOException { + FileStatus status = fileSystem.getFileStatus(path); + + if (!status.isFile()) { + throw new FileNotFoundException("Not a file: " + path.toString()); + } + } + + /** + * Given a path, check if it exists and is a file + * + * @param path + * absolute path to the file to check + * @returns true if and only if path exists and is a file, false for all other + * reasons including if file check throws IOException + */ + public boolean isFile(Path path) { + boolean isFile = false; + try { + FileStatus status = fileSystem.getFileStatus(path); + if (status.isFile()) { + isFile = true; + } + } catch (IOException e) { + // ignore, isFile is already set to false + } + return isFile; + } + + /** + * Verify that a file exists in the zip file given by path + * @param path path to zip file + * @param file file expected to be in zip + * @throws FileNotFoundException file not found or is not a zip file + * @throws IOException trouble with FS + */ + public void verifyFileExistsInZip(Path path, String file) throws IOException { + fileSystem.copyToLocalFile(path, new Path("/tmp")); + File dst = new File((new Path("/tmp", path.getName())).toString()); + Enumeration entries; + ZipFile zipFile = new ZipFile(dst); + boolean found = false; + + try { + entries = zipFile.entries(); + while (entries.hasMoreElements()) { + ZipEntry entry = entries.nextElement(); + String nm = entry.getName(); + if (nm.endsWith(file)) { + found = true; + break; + } + } + } finally { + zipFile.close(); + } + dst.delete(); + if (!found) throw new FileNotFoundException("file: " + file + " not found in " + path); + log.info("Verification of " + path + " passed"); + } + /** + * Create the application-instance specific temporary directory + * in the DFS + * + * @param clustername name of the cluster + * @param subdir application ID + * @return the path; this directory will already have been created + */ + public Path createAppInstanceTempPath(String clustername, String subdir) + throws IOException { + Path tmp = getTempPathForCluster(clustername); + Path instancePath = new Path(tmp, subdir); + fileSystem.mkdirs(instancePath); + return instancePath; + } + + /** + * Create the application-instance specific temporary directory + * in the DFS + * + * @param clustername name of the cluster + * @return the path; this directory will already have been deleted + */ + public Path purgeAppInstanceTempFiles(String clustername) throws + IOException { + Path tmp = getTempPathForCluster(clustername); + fileSystem.delete(tmp, true); + return tmp; + } + + /** + * Get the base path + * + * @return the base path optionally configured by + * {@link SliderXmlConfKeys#KEY_SLIDER_BASE_PATH} + */ + public Path getBaseApplicationPath() { + String configuredBasePath = configuration.get(SliderXmlConfKeys.KEY_SLIDER_BASE_PATH); + return configuredBasePath != null ? new Path(configuredBasePath) : + new Path(getHomeDirectory(), SliderKeys.SLIDER_BASE_DIRECTORY); + } + + /** + * Get slider dependency parent dir in HDFS + * + * @return the parent dir path of slider.tar.gz in HDFS + */ + public Path getDependencyPath() { + String parentDir = (SliderUtils.isHdp()) ? SliderKeys.SLIDER_DEPENDENCY_HDP_PARENT_DIR + + SliderKeys.SLIDER_DEPENDENCY_DIR + : SliderKeys.SLIDER_DEPENDENCY_DIR; + Path dependencyPath = new Path(String.format(parentDir, + SliderUtils.getSliderVersion())); + return dependencyPath; + } + + /** + * Get slider.tar.gz absolute filepath in HDFS + * + * @return the absolute path to slider.tar.gz in HDFS + */ + public Path getDependencyTarGzip() { + Path dependencyLibAmPath = getDependencyPath(); + Path dependencyLibTarGzip = new Path( + dependencyLibAmPath.toUri().toString(), + SliderKeys.SLIDER_DEPENDENCY_TAR_GZ_FILE_NAME + + SliderKeys.SLIDER_DEPENDENCY_TAR_GZ_FILE_EXT); + return dependencyLibTarGzip; + } + + public Path getHomeDirectory() { + return fileSystem.getHomeDirectory(); + } + + public boolean maybeAddImagePath(Map localResources, + Path imagePath) throws IOException { + if (imagePath != null) { + LocalResource resource = createAmResource(imagePath, + LocalResourceType.ARCHIVE); + localResources.put(SliderKeys.LOCAL_TARBALL_INSTALL_SUBDIR, resource); + return true; + } else { + return false; + } + } + + public boolean maybeAddImagePath(Map localResources, + String imagePath) throws IOException { + + return imagePath != null && + maybeAddImagePath(localResources, new Path(imagePath)); + } + + + + + /** + * Create an AM resource from the + * + * @param destPath dest path in filesystem + * @param resourceType resource type + * @return the resource set up wih application-level visibility and the + * timestamp & size set from the file stats. + */ + public LocalResource createAmResource(Path destPath, LocalResourceType resourceType) throws IOException { + FileStatus destStatus = fileSystem.getFileStatus(destPath); + LocalResource amResource = Records.newRecord(LocalResource.class); + amResource.setType(resourceType); + // Set visibility of the resource + // Setting to most private option + amResource.setVisibility(LocalResourceVisibility.APPLICATION); + // Set the resource to be copied over + amResource.setResource(ConverterUtils.getYarnUrlFromPath(fileSystem + .resolvePath(destStatus.getPath()))); + // Set timestamp and length of file so that the framework + // can do basic sanity checks for the local resource + // after it has been copied over to ensure it is the same + // resource the client intended to use with the application + amResource.setTimestamp(destStatus.getModificationTime()); + amResource.setSize(destStatus.getLen()); + return amResource; + } + + /** + * Register all files under a fs path as a directory to push out + * + * @param srcDir src dir + * @param destRelativeDir dest dir (no trailing /) + * @return the map of entries + */ + public Map submitDirectory(Path srcDir, String destRelativeDir) throws IOException { + //now register each of the files in the directory to be + //copied to the destination + FileStatus[] fileset = fileSystem.listStatus(srcDir); + Map localResources = + new HashMap(fileset.length); + for (FileStatus entry : fileset) { + + LocalResource resource = createAmResource(entry.getPath(), + LocalResourceType.FILE); + String relativePath = destRelativeDir + "/" + entry.getPath().getName(); + localResources.put(relativePath, resource); + } + return localResources; + } + + /** + * Submit a JAR containing a specific class, returning + * the resource to be mapped in + * + * @param clazz class to look for + * @param subdir subdirectory (expected to end in a "/") + * @param jarName At the destination + * @return the local resource ref + * @throws IOException trouble copying to HDFS + */ + public LocalResource submitJarWithClass(Class clazz, Path tempPath, String subdir, String jarName) + throws IOException, SliderException { + File localFile = SliderUtils.findContainingJarOrFail(clazz); + return submitFile(localFile, tempPath, subdir, jarName); + } + + /** + * Submit a local file to the filesystem references by the instance's cluster + * filesystem + * + * @param localFile filename + * @param subdir subdirectory (expected to end in a "/") + * @param destFileName destination filename + * @return the local resource ref + * @throws IOException trouble copying to HDFS + */ + public LocalResource submitFile(File localFile, Path tempPath, String subdir, String destFileName) + throws IOException { + Path src = new Path(localFile.toString()); + Path subdirPath = new Path(tempPath, subdir); + fileSystem.mkdirs(subdirPath); + Path destPath = new Path(subdirPath, destFileName); + log.debug("Copying {} (size={} bytes) to {}", localFile, localFile.length(), destPath); + + fileSystem.copyFromLocalFile(false, true, src, destPath); + + // Set the type of resource - file or archive + // archives are untarred at destination + // we don't need the jar file to be untarred for now + return createAmResource(destPath, LocalResourceType.FILE); + } + + /** + * Submit the AM tar.gz resource referenced by the instance's cluster + * filesystem. Also, update the providerResources object with the new + * resource. + * + * @param providerResources + * the provider resource map to be updated + * @throws IOException + * trouble copying to HDFS + */ + public void submitTarGzipAndUpdate( + Map providerResources) throws IOException, + BadClusterStateException { + Path dependencyLibTarGzip = getDependencyTarGzip(); + LocalResource lc = createAmResource(dependencyLibTarGzip, + LocalResourceType.ARCHIVE); + providerResources.put(SliderKeys.SLIDER_DEPENDENCY_LOCALIZED_DIR_LINK, lc); + } + + /** + * Copy local file(s) to destination HDFS directory. If {@code localPath} is a + * local directory then all files matching the {@code filenameFilter} + * (optional) are copied, otherwise {@code filenameFilter} is ignored. + * + * @param localPath + * a local file or directory path + * @param filenameFilter + * if {@code localPath} is a directory then filenameFilter is used as + * a filter (if specified) + * @param destDir + * the destination HDFS directory where the file(s) should be copied + * @param fp + * file permissions of all the directories and files that will be + * created in this api + * @throws IOException + */ + public void copyLocalFilesToHdfs(File localPath, + FilenameFilter filenameFilter, Path destDir, FsPermission fp) + throws IOException { + if (localPath == null || destDir == null) { + throw new IOException("Either localPath or destDir is null"); + } + fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, + "000"); + fileSystem.mkdirs(destDir, fp); + if (localPath.isDirectory()) { + // copy all local files under localPath to destDir (honoring filename + // filter if provided + File[] localFiles = localPath.listFiles(filenameFilter); + Path[] localFilePaths = new Path[localFiles.length]; + int i = 0; + for (File localFile : localFiles) { + localFilePaths[i++] = new Path(localFile.getPath()); + } + log.info("Copying {} files from {} to {}", i, localPath.toURI(), + destDir.toUri()); + fileSystem.copyFromLocalFile(false, true, localFilePaths, destDir); + } else { + log.info("Copying file {} to {}", localPath.toURI(), destDir.toUri()); + fileSystem.copyFromLocalFile(false, true, new Path(localPath.getPath()), + destDir); + } + // set permissions for all the files created in the destDir + fileSystem.setPermission(destDir, fp); + } + + public void copyLocalFileToHdfs(File localPath, + Path destPath, FsPermission fp) + throws IOException { + if (localPath == null || destPath == null) { + throw new IOException("Either localPath or destPath is null"); + } + fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, + "000"); + fileSystem.mkdirs(destPath.getParent(), fp); + log.info("Copying file {} to {}", localPath.toURI(), destPath.toUri()); + + fileSystem.copyFromLocalFile(false, true, new Path(localPath.getPath()), + destPath); + // set file permissions of the destPath + fileSystem.setPermission(destPath, fp); + } + + public void copyHdfsFileToLocal(Path hdfsPath, File destFile) + throws IOException { + if (hdfsPath == null || destFile == null) { + throw new IOException("Either hdfsPath or destPath is null"); + } + log.info("Copying file {} to {}", hdfsPath.toUri(), destFile.toURI()); + + Path destPath = new Path(destFile.getPath()); + fileSystem.copyToLocalFile(hdfsPath, destPath); + } + + /** + * list entries in a filesystem directory + * + * @param path directory + * @return a listing, one to a line + * @throws IOException + */ + public String listFSDir(Path path) throws IOException { + FileStatus[] stats = fileSystem.listStatus(path); + StringBuilder builder = new StringBuilder(); + for (FileStatus stat : stats) { + builder.append(stat.getPath().toString()) + .append("\t") + .append(stat.getLen()) + .append("\n"); + } + return builder.toString(); + } + + /** + * List all application instances persisted for this user, giving the + * path. The instance name is the last element in the path + * @return a possibly empty map of application instance names to paths + */ + public Map listPersistentInstances() throws IOException { + FileSystem fs = getFileSystem(); + Path path = new Path(getBaseApplicationPath(), SliderKeys.CLUSTER_DIRECTORY); + log.debug("Looking for all persisted application at {}", path.toString()); + if (!fs.exists(path)) { + // special case: no instances have ever been created + return new HashMap(0); + } + FileStatus[] statuses = fs.listStatus(path); + Map instances = new HashMap(statuses.length); + + // enum the child entries + for (FileStatus status : statuses) { + if (status.isDirectory()) { + // for directories, look for an internal.json underneath + Path child = status.getPath(); + Path internalJson = new Path(child, Filenames.INTERNAL); + if (fs.exists(internalJson)) { + // success => this is an instance + instances.put(child.getName(), child); + } else { + log.info("Malformed cluster found at {}. It does not appear to be a valid persisted instance.", + child.toString()); + } + } + } + return instances; + } + + public void touch(Path path, boolean overwrite) throws IOException { + FSDataOutputStream out = null; + try { + out = fileSystem.create(path, overwrite); + } finally { + IOUtils.closeStream(out); + } + } + + public void cat(Path path, boolean overwrite, String data) throws IOException { + FSDataOutputStream out = null; + try { + out = fileSystem.create(path, overwrite); + byte[] bytes = data.getBytes(Charset.forName("UTF-8")); + out.write(bytes); + } finally { + IOUtils.closeStream(out); + } + } + + public String cat(Path path) throws IOException { + FileStatus status = fileSystem.getFileStatus(path); + byte[] b = new byte[(int) status.getLen()]; + FSDataInputStream in = null; + try { + in = fileSystem.open(path); + int count = in.read(b); + return new String(b, 0, count, UTF_8); + } finally { + IOUtils.closeStream(in); + } + } + + /** + * Create a path that must exist in the cluster fs + * @param uri uri to create + * @return the path + * @throws SliderException if the path does not exist + */ + public Path createPathThatMustExist(String uri) throws + SliderException, IOException { + Preconditions.checkNotNull(uri); + Path path = new Path(uri); + verifyPathExists(path); + return path; + } + + /** + * Locate an application conf json in the FS. This includes a check to verify + * that the file is there. + * + * @param clustername name of the cluster + * @return the path to the spec. + * @throws IOException IO problems + * @throws SliderException if the path isn't there + */ + public Path locateInstanceDefinition(String clustername) throws IOException, + SliderException { + Path clusterDirectory = buildClusterDirPath(clustername); + Path appConfPath = + new Path(clusterDirectory, Filenames.APPCONF); + verifyClusterSpecExists(clustername, appConfPath); + return appConfPath; + } + + /** + * Verify that a cluster specification exists + * @param clustername name of the cluster (For errors only) + * @param clusterSpecPath cluster specification path + * @throws IOException IO problems + * @throws SliderException if the cluster specification is not present + */ + public void verifyClusterSpecExists(String clustername, Path clusterSpecPath) + throws IOException, + SliderException { + if (!fileSystem.isFile(clusterSpecPath)) { + log.debug("Missing specification file {}", clusterSpecPath); + throw UnknownApplicationInstanceException.unknownInstance( + clustername + "\n (definition not found at " + clusterSpecPath); + } + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a379904d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/Duration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/Duration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/Duration.java new file mode 100644 index 0000000..e5fa424 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/Duration.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.common.tools; + +import java.io.Closeable; + +/** + * A duration in milliseconds. This class can be used + * to count time, and to be polled to see if a time limit has + * passed. + */ +public class Duration implements Closeable { + public long start, finish; + public final long limit; + + /** + * Create a duration instance with a limit of 0 + */ + public Duration() { + this(0); + } + + /** + * Create a duration with a limit specified in millis + * @param limit duration in milliseconds + */ + public Duration(long limit) { + this.limit = limit; + } + + /** + * Start + * @return self + */ + public Duration start() { + start = now(); + return this; + } + + /** + * The close operation relays to {@link #finish()}. + * Implementing it allows Duration instances to be automatically + * finish()'d in Java7 try blocks for when used in measuring durations. + */ + @Override + public final void close() { + finish(); + } + + public void finish() { + finish = now(); + } + + protected long now() { + return System.nanoTime()/1000000; + } + + public long getInterval() { + return finish - start; + } + + /** + * return true if the limit has been exceeded + * @return true if a limit was set and the current time + * exceeds it. + */ + public boolean getLimitExceeded() { + return limit >= 0 && ((now() - start) > limit); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("Duration"); + if (finish >= start) { + builder.append(" finished at ").append(getInterval()).append(" millis;"); + } else { + if (start > 0) { + builder.append(" started but not yet finished;"); + } else { + builder.append(" unstarted;"); + } + } + if (limit > 0) { + builder.append(" limit: ").append(limit).append(" millis"); + if (getLimitExceeded()) { + builder.append(" - exceeded"); + } + } + return builder.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a379904d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java new file mode 100644 index 0000000..64783b6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.slider.common.tools; + +import org.apache.slider.common.SliderExitCodes; +import org.apache.slider.core.exceptions.BadConfigException; +import org.apache.slider.core.exceptions.SliderException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * a scanner which can take an input string for a range or scan the lot. + */ +public class PortScanner { + private static Pattern NUMBER_RANGE = Pattern.compile("^(\\d+)\\s*-\\s*(\\d+)$"); + private static Pattern SINGLE_NUMBER = Pattern.compile("^\\d+$"); + + private List remainingPortsToCheck; + + public PortScanner() { + } + + public void setPortRange(String input) throws BadConfigException { + // first split based on commas + Set inputPorts= new TreeSet(); + String[] ranges = input.split(","); + for ( String range : ranges ) { + if (range.trim().isEmpty()) { + continue; + } + Matcher m = SINGLE_NUMBER.matcher(range.trim()); + if (m.find()) { + inputPorts.add(Integer.parseInt(m.group())); + continue; + } + m = NUMBER_RANGE.matcher(range.trim()); + if (m.find()) { + String[] boundaryValues = m.group(0).split("-"); + int start = Integer.parseInt(boundaryValues[0].trim()); + int end = Integer.parseInt(boundaryValues[1].trim()); + if (end < start) { + throw new BadConfigException("End of port range is before start: " + + range + " in input: " + input); + } + for (int i = start; i < end + 1; i++) { + inputPorts.add(i); + } + continue; + } + throw new BadConfigException("Bad port range: " + range + " in input: " + + input); + } + if (inputPorts.size() == 0) { + throw new BadConfigException("No ports found in range: " + input); + } + this.remainingPortsToCheck = new ArrayList(inputPorts); + } + + public List getRemainingPortsToCheck() { + return remainingPortsToCheck; + } + + public int getAvailablePort() throws SliderException, IOException { + if (remainingPortsToCheck != null) { + return getAvailablePortViaPortArray(); + } else { + return SliderUtils.getOpenPort(); + } + } + + private int getAvailablePortViaPortArray() throws SliderException { + boolean found = false; + int availablePort = -1; + Iterator portsToCheck = this.remainingPortsToCheck.iterator(); + while (portsToCheck.hasNext() && !found) { + int portToCheck = portsToCheck.next(); + found = SliderUtils.isPortAvailable(portToCheck); + if (found) { + availablePort = portToCheck; + portsToCheck.remove(); + } + } + + if (availablePort < 0) { + throw new SliderException(SliderExitCodes.EXIT_BAD_CONFIGURATION, + "No available ports found in configured range {}", + remainingPortsToCheck); + } + + return availablePort; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a379904d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderFileSystem.java new file mode 100644 index 0000000..294f37e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/common/tools/SliderFileSystem.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.common.tools; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + +import java.io.IOException; + +/** + * Extends Core Filesystem with operations to manipulate ClusterDescription + * persistent state + */ +public class SliderFileSystem extends CoreFileSystem { + + public SliderFileSystem(FileSystem fileSystem, + Configuration configuration) { + super(fileSystem, configuration); + } + + public SliderFileSystem(Configuration configuration) throws IOException { + super(configuration); + } + + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org