Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5711C10F35 for ; Tue, 3 Mar 2015 11:07:11 +0000 (UTC) Received: (qmail 39881 invoked by uid 500); 3 Mar 2015 11:07:11 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 39844 invoked by uid 500); 3 Mar 2015 11:07:11 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 39834 invoked by uid 99); 3 Mar 2015 11:07:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 11:07:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 03 Mar 2015 11:07:02 +0000 Received: (qmail 36354 invoked by uid 99); 3 Mar 2015 11:06:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 11:06:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BD668E0050; Tue, 3 Mar 2015 11:06:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 03 Mar 2015 11:07:22 -0000 Message-Id: In-Reply-To: <167d7f59418a4984b72d8bb0b0290e2f@git.apache.org> References: <167d7f59418a4984b72d8bb0b0290e2f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [47/50] [abbrv] incubator-ignite git commit: # IGNITE-386: Reworked API in Hadoop module. X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java deleted file mode 100644 index 5475cf4..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java +++ /dev/null @@ -1,1008 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs.hadoop.v2; - -import org.apache.commons.logging.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.permission.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.igfs.hadoop.*; -import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.igfs.hadoop.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.IgniteFs.*; -import static org.apache.ignite.configuration.IgfsConfiguration.*; -import static org.apache.ignite.igfs.IgfsMode.*; -import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*; -import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*; - -/** - * {@code IGFS} Hadoop 2.x file system driver over file system API. To use - * {@code IGFS} as Hadoop file system, you should configure this class - * in Hadoop's {@code core-site.xml} as follows: - *
- *  <property>
- *      <name>fs.default.name</name>
- *      <value>igfs://ipc</value>
- *  </property>
- *
- *  <property>
- *      <name>fs.igfs.impl</name>
- *      <value>org.apache.ignite.igfs.hadoop.IgfsHadoopFileSystem</value>
- *  </property>
- * 
- * You should also add Ignite JAR and all libraries to Hadoop classpath. To - * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop - * distribution: - *
- * export IGNITE_HOME=/path/to/Ignite/distribution
- * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar
- *
- * for f in $IGNITE_HOME/libs/*.jar; do
- *  export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f;
- * done
- * 
- *

Data vs Clients Nodes

- * Hadoop needs to use its FileSystem remotely from client nodes as well as directly on - * data nodes. Client nodes are responsible for basic file system operations as well as - * accessing data nodes remotely. Usually, client nodes are started together - * with {@code job-submitter} or {@code job-scheduler} processes, while data nodes are usually - * started together with Hadoop {@code task-tracker} processes. - *

- * For sample client and data node configuration refer to {@code config/hadoop/default-config-client.xml} - * and {@code config/hadoop/default-config.xml} configuration files in Ignite installation. - */ -public class IgfsHadoopFileSystem extends AbstractFileSystem implements Closeable { - /** Logger. */ - private static final Log LOG = LogFactory.getLog(IgfsHadoopFileSystem.class); - - /** Ensures that close routine is invoked at most once. */ - private final AtomicBoolean closeGuard = new AtomicBoolean(); - - /** Grid remote client. */ - private IgfsHadoopWrapper rmtClient; - - /** Working directory. */ - private IgfsPath workingDir; - - /** URI. */ - private URI uri; - - /** Authority. */ - private String uriAuthority; - - /** Client logger. */ - private IgfsLogger clientLog; - - /** Server block size. */ - private long grpBlockSize; - - /** Default replication factor. */ - private short dfltReplication; - - /** Secondary URI string. */ - private URI secondaryUri; - - /** Mode resolver. */ - private IgfsModeResolver modeRslvr; - - /** Secondary file system instance. */ - private AbstractFileSystem secondaryFs; - - /** Whether custom sequential reads before prefetch value is provided. */ - private boolean seqReadsBeforePrefetchOverride; - - /** Custom-provided sequential reads before prefetch. */ - private int seqReadsBeforePrefetch; - - /** Flag that controls whether file writes should be colocated on data node. */ - private boolean colocateFileWrites; - - /** Prefer local writes. */ - private boolean preferLocFileWrites; - - /** - * @param name URI for file system. - * @param cfg Configuration. - * @throws URISyntaxException if name has invalid syntax. - * @throws IOException If initialization failed. - */ - public IgfsHadoopFileSystem(URI name, Configuration cfg) throws URISyntaxException, IOException { - super(IgfsHadoopEndpoint.normalize(name), IGFS_SCHEME, false, -1); - - uri = name; - - try { - initialize(name, cfg); - } - catch (IOException e) { - // Close client if exception occurred. - if (rmtClient != null) - rmtClient.close(false); - - throw e; - } - - workingDir = new IgfsPath("/user/" + cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); - } - - /** {@inheritDoc} */ - @Override public void checkPath(Path path) { - URI uri = path.toUri(); - - if (uri.isAbsolute()) { - if (!F.eq(uri.getScheme(), IGFS_SCHEME)) - throw new InvalidPathException("Wrong path scheme [expected=" + IGFS_SCHEME + ", actual=" + - uri.getAuthority() + ']'); - - if (!F.eq(uri.getAuthority(), uriAuthority)) - throw new InvalidPathException("Wrong path authority [expected=" + uriAuthority + ", actual=" + - uri.getAuthority() + ']'); - } - } - - /** - * Public setter that can be used by direct users of FS or Visor. - * - * @param colocateFileWrites Whether all ongoing file writes should be colocated. - */ - @SuppressWarnings("UnusedDeclaration") - public void colocateFileWrites(boolean colocateFileWrites) { - this.colocateFileWrites = colocateFileWrites; - } - - /** - * Enter busy state. - * - * @throws IOException If file system is stopped. - */ - private void enterBusy() throws IOException { - if (closeGuard.get()) - throw new IOException("File system is stopped."); - } - - /** - * Leave busy state. - */ - private void leaveBusy() { - // No-op. - } - - /** - * @param name URI passed to constructor. - * @param cfg Configuration passed to constructor. - * @throws IOException If initialization failed. - */ - private void initialize(URI name, Configuration cfg) throws IOException { - enterBusy(); - - try { - if (rmtClient != null) - throw new IOException("File system is already initialized: " + rmtClient); - - A.notNull(name, "name"); - A.notNull(cfg, "cfg"); - - if (!IGFS_SCHEME.equals(name.getScheme())) - throw new IOException("Illegal file system URI [expected=" + IGFS_SCHEME + - "://[name]/[optional_path], actual=" + name + ']'); - - uriAuthority = name.getAuthority(); - - // Override sequential reads before prefetch if needed. - seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); - - if (seqReadsBeforePrefetch > 0) - seqReadsBeforePrefetchOverride = true; - - // In Ignite replication factor is controlled by data cache affinity. - // We use replication factor to force the whole file to be stored on local node. - dfltReplication = (short)cfg.getInt("dfs.replication", 3); - - // Get file colocation control flag. - colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, uriAuthority, false); - preferLocFileWrites = cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false); - - // Get log directory. - String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, uriAuthority, DFLT_IGFS_LOG_DIR); - - File logDirFile = U.resolveIgnitePath(logDirCfg); - - String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; - - rmtClient = new IgfsHadoopWrapper(uriAuthority, logDir, cfg, LOG); - - // Handshake. - IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); - - grpBlockSize = handshake.blockSize(); - - IgfsPaths paths = handshake.secondaryPaths(); - - Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false); - - if (handshake.sampling() != null ? handshake.sampling() : logEnabled) { - // Initiate client logger. - if (logDir == null) - throw new IOException("Failed to resolve log directory: " + logDirCfg); - - Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE); - - clientLog = IgfsLogger.logger(uriAuthority, handshake.igfsName(), logDir, batchSize); - } - else - clientLog = IgfsLogger.disabledLogger(); - - modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes()); - - boolean initSecondary = paths.defaultMode() == PROXY; - - if (paths.pathModes() != null) { - for (T2 pathMode : paths.pathModes()) { - IgfsMode mode = pathMode.getValue(); - - initSecondary |= mode == PROXY; - } - } - - if (initSecondary) { - Map props = paths.properties(); - - String secUri = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_URI); - String secConfPath = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_CONFIG_PATH); - - if (secConfPath == null) - throw new IOException("Failed to connect to the secondary file system because configuration " + - "path is not provided."); - - if (secUri == null) - throw new IOException("Failed to connect to the secondary file system because URI is not " + - "provided."); - - if (secConfPath == null) - throw new IOException("Failed to connect to the secondary file system because configuration " + - "path is not provided."); - - if (secUri == null) - throw new IOException("Failed to connect to the secondary file system because URI is not " + - "provided."); - - try { - secondaryUri = new URI(secUri); - - URL secondaryCfgUrl = U.resolveIgniteUrl(secConfPath); - - if (secondaryCfgUrl == null) - throw new IOException("Failed to resolve secondary file system config URL: " + secConfPath); - - Configuration conf = new Configuration(); - - conf.addResource(secondaryCfgUrl); - - String prop = String.format("fs.%s.impl.disable.cache", secondaryUri.getScheme()); - - conf.setBoolean(prop, true); - - secondaryFs = AbstractFileSystem.get(secondaryUri, conf); - } - catch (URISyntaxException ignore) { - throw new IOException("Failed to resolve secondary file system URI: " + secUri); - } - catch (IOException e) { - throw new IOException("Failed to connect to the secondary file system: " + secUri, e); - } - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - if (closeGuard.compareAndSet(false, true)) { - if (rmtClient == null) - return; - - rmtClient.close(false); - - if (clientLog.isLogEnabled()) - clientLog.close(); - - // Reset initialized resources. - rmtClient = null; - } - } - - /** {@inheritDoc} */ - @Override public URI getUri() { - return uri; - } - - /** {@inheritDoc} */ - @Override public int getUriDefaultPort() { - return -1; - } - - /** {@inheritDoc} */ - @Override public FsServerDefaults getServerDefaults() throws IOException { - return new FsServerDefaults(grpBlockSize, (int)grpBlockSize, (int)grpBlockSize, dfltReplication, 64 * 1024, - false, 0, DataChecksum.Type.NULL); - } - - /** {@inheritDoc} */ - @Override public boolean setReplication(Path f, short replication) throws IOException { - return mode(f) == PROXY && secondaryFs.setReplication(f, replication); - } - - /** {@inheritDoc} */ - @Override public void setTimes(Path f, long mtime, long atime) throws IOException { - if (mode(f) == PROXY) - secondaryFs.setTimes(f, mtime, atime); - else { - if (mtime == -1 && atime == -1) - return; - - rmtClient.setTimes(convert(f), atime, mtime); - } - } - - /** {@inheritDoc} */ - @Override public FsStatus getFsStatus() throws IOException { - IgfsStatus status = rmtClient.fsStatus(); - - return new FsStatus(status.spaceTotal(), status.spaceUsed(), status.spaceTotal() - status.spaceUsed()); - } - - /** {@inheritDoc} */ - @Override public void setPermission(Path p, FsPermission perm) throws IOException { - enterBusy(); - - try { - A.notNull(p, "p"); - - if (mode(p) == PROXY) - secondaryFs.setPermission(toSecondary(p), perm); - else { - if (rmtClient.update(convert(p), permission(perm)) == null) - throw new IOException("Failed to set file permission (file not found?)" + - " [path=" + p + ", perm=" + perm + ']'); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void setOwner(Path p, String usr, String grp) throws IOException { - A.notNull(p, "p"); - A.notNull(usr, "username"); - A.notNull(grp, "grpName"); - - enterBusy(); - - try { - if (mode(p) == PROXY) - secondaryFs.setOwner(toSecondary(p), usr, grp); - else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, usr, PROP_GROUP_NAME, grp)) == null) - throw new IOException("Failed to set file permission (file not found?)" + - " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']'); - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public FSDataInputStream open(Path f, int bufSize) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - - if (mode == PROXY) { - FSDataInputStream is = secondaryFs.open(toSecondary(f), bufSize); - - if (clientLog.isLogEnabled()) { - // At this point we do not know file size, so we perform additional request to remote FS to get it. - FileStatus status = secondaryFs.getFileStatus(toSecondary(f)); - - long size = status != null ? status.getLen() : -1; - - long logId = IgfsLogger.nextId(); - - clientLog.logOpen(logId, path, PROXY, bufSize, size); - - return new FSDataInputStream(new IgfsHadoopProxyInputStream(is, clientLog, logId)); - } - else - return is; - } - else { - IgfsHadoopStreamDelegate stream = seqReadsBeforePrefetchOverride ? - rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path); - - long logId = -1; - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logOpen(logId, path, mode, bufSize, stream.length()); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path + - ", bufSize=" + bufSize + ']'); - - IgfsHadoopInputStream igfsIn = new IgfsHadoopInputStream(stream, stream.length(), - bufSize, LOG, clientLog, logId); - - if (LOG.isDebugEnabled()) - LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']'); - - return new FSDataInputStream(igfsIn); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public FSDataOutputStream createInternal( - Path f, - EnumSet flag, - FsPermission perm, - int bufSize, - short replication, - long blockSize, - Progressable progress, - Options.ChecksumOpt checksumOpt, - boolean createParent - ) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - boolean overwrite = flag.contains(CreateFlag.OVERWRITE); - boolean append = flag.contains(CreateFlag.APPEND); - boolean create = flag.contains(CreateFlag.CREATE); - - OutputStream out = null; - - try { - IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - - if (LOG.isDebugEnabled()) - LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" + - path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']'); - - if (mode == PROXY) { - FSDataOutputStream os = secondaryFs.createInternal(toSecondary(f), flag, perm, bufSize, - replication, blockSize, progress, checksumOpt, createParent); - - if (clientLog.isLogEnabled()) { - long logId = IgfsLogger.nextId(); - - if (append) - clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID. - else - clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize); - - return new FSDataOutputStream(new IgfsHadoopProxyOutputStream(os, clientLog, logId)); - } - else - return os; - } - else { - Map permMap = F.asMap(PROP_PERMISSION, toString(perm), - PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites)); - - // Create stream and close it in the 'finally' section if any sequential operation failed. - IgfsHadoopStreamDelegate stream; - - long logId = -1; - - if (append) { - stream = rmtClient.append(path, create, permMap); - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logAppend(logId, path, mode, bufSize); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']'); - } - else { - stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize, - permMap); - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']'); - } - - assert stream != null; - - IgfsHadoopOutputStream igfsOut = new IgfsHadoopOutputStream(stream, LOG, - clientLog, logId); - - bufSize = Math.max(64 * 1024, bufSize); - - out = new BufferedOutputStream(igfsOut, bufSize); - - FSDataOutputStream res = new FSDataOutputStream(out, null, 0); - - // Mark stream created successfully. - out = null; - - return res; - } - } - finally { - // Close if failed during stream creation. - if (out != null) - U.closeQuiet(out); - - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public boolean supportsSymlinks() { - return false; - } - - /** {@inheritDoc} */ - @Override public void renameInternal(Path src, Path dst) throws IOException { - A.notNull(src, "src"); - A.notNull(dst, "dst"); - - enterBusy(); - - try { - IgfsPath srcPath = convert(src); - IgfsPath dstPath = convert(dst); - Set childrenModes = modeRslvr.resolveChildrenModes(srcPath); - - if (childrenModes.contains(PROXY)) { - if (clientLog.isLogEnabled()) - clientLog.logRename(srcPath, PROXY, dstPath); - - secondaryFs.renameInternal(toSecondary(src), toSecondary(dst)); - } - - rmtClient.rename(srcPath, dstPath); - - if (clientLog.isLogEnabled()) - clientLog.logRename(srcPath, modeRslvr.resolveMode(srcPath), dstPath); - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public boolean delete(Path f, boolean recursive) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - Set childrenModes = modeRslvr.resolveChildrenModes(path); - - if (childrenModes.contains(PROXY)) { - if (clientLog.isLogEnabled()) - clientLog.logDelete(path, PROXY, recursive); - - return secondaryFs.delete(toSecondary(f), recursive); - } - - boolean res = rmtClient.delete(path, recursive); - - if (clientLog.isLogEnabled()) - clientLog.logDelete(path, mode, recursive); - - return res; - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException { - // Checksum has effect for secondary FS only. - if (secondaryFs != null) - secondaryFs.setVerifyChecksum(verifyChecksum); - } - - /** {@inheritDoc} */ - @Override public FileChecksum getFileChecksum(Path f) throws IOException { - if (mode(f) == PROXY) - return secondaryFs.getFileChecksum(f); - - return null; - } - - /** {@inheritDoc} */ - @Override public FileStatus[] listStatus(Path f) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - - if (mode == PROXY) { - FileStatus[] arr = secondaryFs.listStatus(toSecondary(f)); - - if (arr == null) - throw new FileNotFoundException("File " + f + " does not exist."); - - for (int i = 0; i < arr.length; i++) - arr[i] = toPrimary(arr[i]); - - if (clientLog.isLogEnabled()) { - String[] fileArr = new String[arr.length]; - - for (int i = 0; i < arr.length; i++) - fileArr[i] = arr[i].getPath().toString(); - - clientLog.logListDirectory(path, PROXY, fileArr); - } - - return arr; - } - else { - Collection list = rmtClient.listFiles(path); - - if (list == null) - throw new FileNotFoundException("File " + f + " does not exist."); - - List files = new ArrayList<>(list); - - FileStatus[] arr = new FileStatus[files.size()]; - - for (int i = 0; i < arr.length; i++) - arr[i] = convert(files.get(i)); - - if (clientLog.isLogEnabled()) { - String[] fileArr = new String[arr.length]; - - for (int i = 0; i < arr.length; i++) - fileArr[i] = arr[i].getPath().toString(); - - clientLog.logListDirectory(path, mode, fileArr); - } - - return arr; - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void mkdir(Path f, FsPermission perm, boolean createParent) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - - if (mode == PROXY) { - if (clientLog.isLogEnabled()) - clientLog.logMakeDirectory(path, PROXY); - - secondaryFs.mkdir(toSecondary(f), perm, createParent); - } - else { - rmtClient.mkdirs(path, permission(perm)); - - if (clientLog.isLogEnabled()) - clientLog.logMakeDirectory(path, mode); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public FileStatus getFileStatus(Path f) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - if (mode(f) == PROXY) - return toPrimary(secondaryFs.getFileStatus(toSecondary(f))); - else { - IgfsFile info = rmtClient.info(convert(f)); - - if (info == null) - throw new FileNotFoundException("File not found: " + f); - - return convert(info); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws IOException { - A.notNull(path, "path"); - - IgfsPath igfsPath = convert(path); - - enterBusy(); - - try { - if (modeRslvr.resolveMode(igfsPath) == PROXY) - return secondaryFs.getFileBlockLocations(path, start, len); - else { - long now = System.currentTimeMillis(); - - List affinity = new ArrayList<>( - rmtClient.affinity(igfsPath, start, len)); - - BlockLocation[] arr = new BlockLocation[affinity.size()]; - - for (int i = 0; i < arr.length; i++) - arr[i] = convert(affinity.get(i)); - - if (LOG.isDebugEnabled()) - LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" + - (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']'); - - return arr; - } - } - finally { - leaveBusy(); - } - } - - /** - * Resolve path mode. - * - * @param path HDFS path. - * @return Path mode. - */ - public IgfsMode mode(Path path) { - return modeRslvr.resolveMode(convert(path)); - } - - /** - * Convert the given path to path acceptable by the primary file system. - * - * @param path Path. - * @return Primary file system path. - */ - private Path toPrimary(Path path) { - return convertPath(path, getUri()); - } - - /** - * Convert the given path to path acceptable by the secondary file system. - * - * @param path Path. - * @return Secondary file system path. - */ - private Path toSecondary(Path path) { - assert secondaryFs != null; - assert secondaryUri != null; - - return convertPath(path, secondaryUri); - } - - /** - * Convert path using the given new URI. - * - * @param path Old path. - * @param newUri New URI. - * @return New path. - */ - private Path convertPath(Path path, URI newUri) { - assert newUri != null; - - if (path != null) { - URI pathUri = path.toUri(); - - try { - return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null, - pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null)); - } - catch (URISyntaxException e) { - throw new IgniteException("Failed to construct secondary file system path from the primary file " + - "system path: " + path, e); - } - } - else - return null; - } - - /** - * Convert a file status obtained from the secondary file system to a status of the primary file system. - * - * @param status Secondary file system status. - * @return Primary file system status. - */ - private FileStatus toPrimary(FileStatus status) { - return status != null ? new FileStatus(status.getLen(), status.isDirectory(), status.getReplication(), - status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(), - status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null; - } - - /** - * Convert IGFS path into Hadoop path. - * - * @param path IGFS path. - * @return Hadoop path. - */ - private Path convert(IgfsPath path) { - return new Path(IGFS_SCHEME, uriAuthority, path.toString()); - } - - /** - * Convert Hadoop path into IGFS path. - * - * @param path Hadoop path. - * @return IGFS path. - */ - @Nullable private IgfsPath convert(Path path) { - if (path == null) - return null; - - return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) : - new IgfsPath(workingDir, path.toUri().getPath()); - } - - /** - * Convert IGFS affinity block location into Hadoop affinity block location. - * - * @param block IGFS affinity block location. - * @return Hadoop affinity block location. - */ - private BlockLocation convert(IgfsBlockLocation block) { - Collection names = block.names(); - Collection hosts = block.hosts(); - - return new BlockLocation( - names.toArray(new String[names.size()]) /* hostname:portNumber of data nodes */, - hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes */, - block.start(), block.length() - ) { - @Override public String toString() { - try { - return "BlockLocation [offset=" + getOffset() + ", length=" + getLength() + - ", hosts=" + Arrays.asList(getHosts()) + ", names=" + Arrays.asList(getNames()) + ']'; - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }; - } - - /** - * Convert IGFS file information into Hadoop file status. - * - * @param file IGFS file information. - * @return Hadoop file status. - */ - private FileStatus convert(IgfsFile file) { - return new FileStatus( - file.length(), - file.isDirectory(), - dfltReplication, - file.groupBlockSize(), - file.modificationTime(), - file.accessTime(), - permission(file), - file.property(PROP_USER_NAME, DFLT_USER_NAME), - file.property(PROP_GROUP_NAME, "users"), - convert(file.path())) { - @Override public String toString() { - return "FileStatus [path=" + getPath() + ", isDir=" + isDirectory() + ", len=" + getLen() + "]"; - } - }; - } - - /** - * Convert Hadoop permission into IGFS file attribute. - * - * @param perm Hadoop permission. - * @return IGFS attributes. - */ - private Map permission(FsPermission perm) { - if (perm == null) - perm = FsPermission.getDefault(); - - return F.asMap(PROP_PERMISSION, toString(perm)); - } - - /** - * @param perm Permission. - * @return String. - */ - private static String toString(FsPermission perm) { - return String.format("%04o", perm.toShort()); - } - - /** - * Convert IGFS file attributes into Hadoop permission. - * - * @param file File info. - * @return Hadoop permission. - */ - private FsPermission permission(IgfsFile file) { - String perm = file.property(PROP_PERMISSION, null); - - if (perm == null) - return FsPermission.getDefault(); - - try { - return new FsPermission((short)Integer.parseInt(perm, 8)); - } - catch (NumberFormatException ignore) { - return FsPermission.getDefault(); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsHadoopFileSystem.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html deleted file mode 100644 index 6df66f4..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html +++ /dev/null @@ -1,24 +0,0 @@ - - - - - - - Contains Hadoop 2.x FileSystem wrapper for Ignite file system. - - http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html b/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html deleted file mode 100644 index ec38a21..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html +++ /dev/null @@ -1,24 +0,0 @@ - - - - - - - Contains IGnite File System APIs. - - http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java new file mode 100644 index 0000000..bdf8fc6 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.counters.*; + +import java.io.*; +import java.util.*; + +/** + * Hadoop counter group adapter. + */ +class HadoopCounterGroup implements CounterGroup { + /** Counters. */ + private final HadoopCounters cntrs; + + /** Group name. */ + private final String name; + + /** + * Creates new instance. + * + * @param cntrs Client counters instance. + * @param name Group name. + */ + HadoopCounterGroup(HadoopCounters cntrs, String name) { + this.cntrs = cntrs; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String getName() { + return name; + } + + /** {@inheritDoc} */ + @Override public String getDisplayName() { + return name; + } + + /** {@inheritDoc} */ + @Override public void setDisplayName(String displayName) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void addCounter(Counter counter) { + addCounter(counter.getName(), counter.getDisplayName(), 0); + } + + /** {@inheritDoc} */ + @Override public Counter addCounter(String name, String displayName, long value) { + final Counter counter = cntrs.findCounter(this.name, name); + + counter.setValue(value); + + return counter; + } + + /** {@inheritDoc} */ + @Override public Counter findCounter(String counterName, String displayName) { + return cntrs.findCounter(name, counterName); + } + + /** {@inheritDoc} */ + @Override public Counter findCounter(String counterName, boolean create) { + return cntrs.findCounter(name, counterName, create); + } + + /** {@inheritDoc} */ + @Override public Counter findCounter(String counterName) { + return cntrs.findCounter(name, counterName); + } + + /** {@inheritDoc} */ + @Override public int size() { + return cntrs.groupSize(name); + } + + /** {@inheritDoc} */ + @Override public void incrAllCounters(CounterGroupBase rightGroup) { + for (final Counter counter : rightGroup) + cntrs.findCounter(name, counter.getName()).increment(counter.getValue()); + } + + /** {@inheritDoc} */ + @Override public CounterGroupBase getUnderlyingGroup() { + return this; + } + + /** {@inheritDoc} */ + @Override public Iterator iterator() { + return cntrs.iterateGroup(name); + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java new file mode 100644 index 0000000..c7f0157 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.counters.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; + +/** + * Hadoop counters adapter. + */ +public class HadoopCounters extends Counters { + /** */ + private final Map,GridHadoopLongCounter> cntrs = new HashMap<>(); + + /** + * Creates new instance based on given counters. + * + * @param cntrs Counters to adapt. + */ + public HadoopCounters(GridHadoopCounters cntrs) { + for (GridHadoopCounter cntr : cntrs.all()) + if (cntr instanceof GridHadoopLongCounter) + this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (GridHadoopLongCounter) cntr); + } + + /** {@inheritDoc} */ + @Override public synchronized CounterGroup addGroup(CounterGroup grp) { + return addGroup(grp.getName(), grp.getDisplayName()); + } + + /** {@inheritDoc} */ + @Override public CounterGroup addGroup(String name, String displayName) { + return new HadoopCounterGroup(this, name); + } + + /** {@inheritDoc} */ + @Override public Counter findCounter(String grpName, String cntrName) { + return findCounter(grpName, cntrName, true); + } + + /** {@inheritDoc} */ + @Override public synchronized Counter findCounter(Enum key) { + return findCounter(key.getDeclaringClass().getName(), key.name(), true); + } + + /** {@inheritDoc} */ + @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) { + return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name()); + } + + /** {@inheritDoc} */ + @Override public synchronized Iterable getGroupNames() { + Collection res = new HashSet<>(); + + for (GridHadoopCounter counter : cntrs.values()) + res.add(counter.group()); + + return res; + } + + /** {@inheritDoc} */ + @Override public Iterator iterator() { + final Iterator iter = getGroupNames().iterator(); + + return new Iterator() { + @Override public boolean hasNext() { + return iter.hasNext(); + } + + @Override public CounterGroup next() { + if (!hasNext()) + throw new NoSuchElementException(); + + return new HadoopCounterGroup(HadoopCounters.this, iter.next()); + } + + @Override public void remove() { + throw new UnsupportedOperationException("not implemented"); + } + }; + } + + /** {@inheritDoc} */ + @Override public synchronized CounterGroup getGroup(String grpName) { + return new HadoopCounterGroup(this, grpName); + } + + /** {@inheritDoc} */ + @Override public synchronized int countCounters() { + return cntrs.size(); + } + + /** {@inheritDoc} */ + @Override public synchronized void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public synchronized void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public synchronized void incrAllCounters(AbstractCounters other) { + for (CounterGroup group : other) { + for (Counter counter : group) { + findCounter(group.getName(), counter.getName()).increment(counter.getValue()); + } + } + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object genericRight) { + if (!(genericRight instanceof HadoopCounters)) + return false; + + return cntrs.equals(((HadoopCounters) genericRight).cntrs); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return cntrs.hashCode(); + } + + /** {@inheritDoc} */ + @Override public void setWriteAllCounters(boolean snd) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean getWriteAllCounters() { + return true; + } + + /** {@inheritDoc} */ + @Override public Limits limits() { + return null; + } + + /** + * Returns size of a group. + * + * @param grpName Name of the group. + * @return amount of counters in the given group. + */ + public int groupSize(String grpName) { + int res = 0; + + for (GridHadoopCounter counter : cntrs.values()) { + if (grpName.equals(counter.group())) + res++; + } + + return res; + } + + /** + * Returns counters iterator for specified group. + * + * @param grpName Name of the group to iterate. + * @return Counters iterator. + */ + public Iterator iterateGroup(String grpName) { + Collection grpCounters = new ArrayList<>(); + + for (GridHadoopLongCounter counter : cntrs.values()) { + if (grpName.equals(counter.group())) + grpCounters.add(new GridHadoopV2Counter(counter)); + } + + return grpCounters.iterator(); + } + + /** + * Find a counter in the group. + * + * @param grpName The name of the counter group. + * @param cntrName The name of the counter. + * @param create Create the counter if not found if true. + * @return The counter that was found or added or {@code null} if create is false. + */ + public Counter findCounter(String grpName, String cntrName, boolean create) { + T2 key = new T2<>(grpName, cntrName); + + GridHadoopLongCounter internalCntr = cntrs.get(key); + + if (internalCntr == null & create) { + internalCntr = new GridHadoopLongCounter(grpName,cntrName); + + cntrs.put(key, new GridHadoopLongCounter(grpName,cntrName)); + } + + return internalCntr == null ? null : new GridHadoopV2Counter(internalCntr); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java index 52e7d29..e1bf9b6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.hadoop.fs; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.hdfs.protocol.*; -import org.apache.ignite.igfs.hadoop.v1.*; +import org.apache.ignite.hadoop.fs.v1.*; /** * Utilities for configuring file systems to support the separate working directory per each thread. @@ -36,8 +36,8 @@ public class GridHadoopFileSystemsUtils { * @param userName User name. */ public static void setUser(FileSystem fs, String userName) { - if (fs instanceof IgfsHadoopFileSystem) - ((IgfsHadoopFileSystem)fs).setUser(userName); + if (fs instanceof IgniteHadoopFileSystem) + ((IgniteHadoopFileSystem)fs).setUser(userName); else if (fs instanceof GridHadoopDistributedFileSystem) ((GridHadoopDistributedFileSystem)fs).setUser(userName); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java new file mode 100644 index 0000000..36fabb7 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +/** + * This class lists parameters that can be specified in Hadoop configuration. + * Hadoop configuration can be specified in {@code core-site.xml} file + * or passed to map-reduce task directly when using Hadoop driver for IGFS file system: + *

    + *
  • + * {@code fs.igfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides + * the one specified in {@link org.apache.ignite.configuration.IgfsConfiguration#getSequentialReadsBeforePrefetch()} + * IGFS data node configuration property. + *
  • + *
  • + * {@code fs.igfs.[name].log.enabled} - specifies whether IGFS sampling logger is enabled. If + * {@code true}, then all file system operations will be logged to a file. + *
  • + *
  • {@code fs.igfs.[name].log.dir} - specifies log directory where sampling log files should be placed.
  • + *
  • + * {@code fs.igfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before + * it gets flushed to log file. Higher values will imply greater performance, but will increase delay + * before record appears in the log file. + *
  • + *
  • + * {@code fs.igfs.[name].colocated.writes} - specifies whether written files should be colocated on data + * node to which client is connected. If {@code true}, file will not be distributed and will be written + * to a single data node. Default value is {@code true}. + *
  • + *
  • + * {@code fs.igfs.prefer.local.writes} - specifies whether file preferably should be written to + * local data node if it has enough free space. After some time it can be redistributed across nodes though. + *
  • + *
+ * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in + * case your file system URI is {@code igfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}. + *

+ * Sample configuration that can be placed to {@code core-site.xml} file: + *

+ *     <property>
+ *         <name>fs.igfs.127.0.0.1:10500.log.enabled</name>
+ *         <value>true</value>
+ *     </property>
+ *     <property>
+ *         <name>fs.igfs.127.0.0.1:10500.log.dir</name>
+ *         <value>/home/apache/ignite/log/sampling</value>
+ *     </property>
+ *     <property>
+ *         <name>fs.igfs.127.0.0.1:10500.log.batch_size</name>
+ *         <value>16</value>
+ *     </property>
+ * 
+ * Parameters could also be specified per mapreduce job, e.g. + *
+ * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.igfs.open.sequential_reads_before_prefetch=4
+ * 
+ * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest + * way to do that is {@code String.format(PARAM_IGFS_COLOCATED_WRITES, [name])}. + */ +public class HadoopParameters { + /** Parameter name for control over file colocation write mode. */ + public static final String PARAM_IGFS_COLOCATED_WRITES = "fs.igfs.%s.colocated.writes"; + + /** Parameter name for custom sequential reads before prefetch value. */ + public static final String PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH = + "fs.igfs.%s.open.sequential_reads_before_prefetch"; + + /** Parameter name for client logger directory. */ + public static final String PARAM_IGFS_LOG_DIR = "fs.igfs.%s.log.dir"; + + /** Parameter name for log batch size. */ + public static final String PARAM_IGFS_LOG_BATCH_SIZE = "fs.igfs.%s.log.batch_size"; + + /** Parameter name for log enabled flag. */ + public static final String PARAM_IGFS_LOG_ENABLED = "fs.igfs.%s.log.enabled"; + + /** Parameter name for prefer local writes flag. */ + public static final String PARAM_IGFS_PREFER_LOCAL_WRITES = "fs.igfs.prefer.local.writes"; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java new file mode 100644 index 0000000..4c83ace --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.ipc.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.protocol.*; +import org.apache.hadoop.mapreduce.security.token.delegation.*; +import org.apache.hadoop.mapreduce.v2.*; +import org.apache.hadoop.mapreduce.v2.jobhistory.*; +import org.apache.hadoop.security.*; +import org.apache.hadoop.security.authorize.*; +import org.apache.hadoop.security.token.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; + +/** + * Hadoop client protocol. + */ +public class HadoopClientProtocol implements ClientProtocol { + /** Ignite framework name property. */ + public static final String FRAMEWORK_NAME = "ignite"; + + /** Protocol version. */ + private static final long PROTO_VER = 1L; + + /** Default Ignite system directory. */ + private static final String SYS_DIR = ".ignite/system"; + + /** Configuration. */ + private final Configuration conf; + + /** Ignite client. */ + private volatile GridClient cli; + + /** Last received version. */ + private long lastVer = -1; + + /** Last received status. */ + private GridHadoopJobStatus lastStatus; + + /** + * Constructor. + * + * @param conf Configuration. + * @param cli Ignite client. + */ + public HadoopClientProtocol(Configuration conf, GridClient cli) { + assert cli != null; + + this.conf = conf; + this.cli = cli; + } + + /** {@inheritDoc} */ + @Override public JobID getNewJobID() throws IOException, InterruptedException { + try { + conf.setLong(REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); + + GridHadoopJobId jobID = cli.compute().execute(GridHadoopProtocolNextTaskIdTask.class.getName(), null); + + conf.setLong(RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); + + return new JobID(jobID.globalId().toString(), jobID.localId()); + } + catch (GridClientException e) { + throw new IOException("Failed to get new job ID.", e); + } + } + + /** {@inheritDoc} */ + @Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, + InterruptedException { + try { + conf.setLong(JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis()); + + GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolSubmitJobTask.class.getName(), + new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf))); + + if (status == null) + throw new IOException("Failed to submit job (null status obtained): " + jobId); + + return processStatus(status); + } + catch (GridClientException | IgniteCheckedException e) { + throw new IOException("Failed to submit job.", e); + } + } + + /** {@inheritDoc} */ + @Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException { + return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0); + } + + /** {@inheritDoc} */ + @Override public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException { + return Cluster.JobTrackerStatus.RUNNING; + } + + /** {@inheritDoc} */ + @Override public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException { + return 0; + } + + /** {@inheritDoc} */ + @Override public AccessControlList getQueueAdmins(String queueName) throws IOException { + return new AccessControlList("*"); + } + + /** {@inheritDoc} */ + @Override public void killJob(JobID jobId) throws IOException, InterruptedException { + try { + cli.compute().execute(GridHadoopProtocolKillJobTask.class.getName(), + new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); + } + catch (GridClientException e) { + throw new IOException("Failed to kill job: " + jobId, e); + } + } + + /** {@inheritDoc} */ + @Override public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException, + InterruptedException { + return false; + } + + /** {@inheritDoc} */ + @Override public JobStatus getJobStatus(JobID jobId) throws IOException, InterruptedException { + try { + Long delay = conf.getLong(GridHadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1); + + GridHadoopProtocolTaskArguments args = delay >= 0 ? + new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) : + new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()); + + GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolJobStatusTask.class.getName(), args); + + if (status == null) + throw new IOException("Job tracker doesn't have any information about the job: " + jobId); + + return processStatus(status); + } + catch (GridClientException e) { + throw new IOException("Failed to get job status: " + jobId, e); + } + } + + /** {@inheritDoc} */ + @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException { + try { + final GridHadoopCounters counters = cli.compute().execute(GridHadoopProtocolJobCountersTask.class.getName(), + new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); + + if (counters == null) + throw new IOException("Job tracker doesn't have any information about the job: " + jobId); + + return new HadoopCounters(counters); + } + catch (GridClientException e) { + throw new IOException("Failed to get job counters: " + jobId, e); + } + } + + /** {@inheritDoc} */ + @Override public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException { + return new TaskReport[0]; + } + + /** {@inheritDoc} */ + @Override public String getFilesystemName() throws IOException, InterruptedException { + return FileSystem.get(conf).getUri().toString(); + } + + /** {@inheritDoc} */ + @Override public JobStatus[] getAllJobs() throws IOException, InterruptedException { + return new JobStatus[0]; + } + + /** {@inheritDoc} */ + @Override public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents) + throws IOException, InterruptedException { + return new TaskCompletionEvent[0]; + } + + /** {@inheritDoc} */ + @Override public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException { + return new String[0]; + } + + /** {@inheritDoc} */ + @Override public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException { + return new TaskTrackerInfo[0]; + } + + /** {@inheritDoc} */ + @Override public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException { + return new TaskTrackerInfo[0]; + } + + /** {@inheritDoc} */ + @Override public String getSystemDir() throws IOException, InterruptedException { + Path sysDir = new Path(SYS_DIR); + + return sysDir.toString(); + } + + /** {@inheritDoc} */ + @Override public String getStagingAreaDir() throws IOException, InterruptedException { + String usr = UserGroupInformation.getCurrentUser().getShortUserName(); + + return GridHadoopUtils.stagingAreaDir(conf, usr).toString(); + } + + /** {@inheritDoc} */ + @Override public String getJobHistoryDir() throws IOException, InterruptedException { + return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); + } + + /** {@inheritDoc} */ + @Override public QueueInfo[] getQueues() throws IOException, InterruptedException { + return new QueueInfo[0]; + } + + /** {@inheritDoc} */ + @Override public QueueInfo getQueue(String queueName) throws IOException, InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException { + return new QueueAclsInfo[0]; + } + + /** {@inheritDoc} */ + @Override public QueueInfo[] getRootQueues() throws IOException, InterruptedException { + return new QueueInfo[0]; + } + + /** {@inheritDoc} */ + @Override public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException { + return new QueueInfo[0]; + } + + /** {@inheritDoc} */ + @Override public Token getDelegationToken(Text renewer) throws IOException, + InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public long renewDelegationToken(Token token) throws IOException, + InterruptedException { + return 0; + } + + /** {@inheritDoc} */ + @Override public void cancelDelegationToken(Token token) throws IOException, + InterruptedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException, + InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return PROTO_VER; + } + + /** {@inheritDoc} */ + @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) + throws IOException { + return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); + } + + /** + * Process received status update. + * + * @param status Ignite status. + * @return Hadoop status. + */ + private JobStatus processStatus(GridHadoopJobStatus status) { + // IMPORTANT! This method will only work in single-threaded environment. It is valid at the moment because + // IgniteHadoopClientProtocolProvider creates new instance of this class for every new job and Job class + // serializes invocations of submitJob() and getJobStatus() methods. However, if any of these conditions will + // change in future and either protocol will serve statuses for several jobs or status update will not be + // serialized anymore, then we have to fallback to concurrent approach (e.g. using ConcurrentHashMap). + // (vozerov) + if (lastVer < status.version()) { + lastVer = status.version(); + + lastStatus = status; + } + else + assert lastStatus != null; + + return GridHadoopUtils.status(lastStatus, conf); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java index 41bd24a..82be91f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java @@ -39,7 +39,7 @@ import org.jetbrains.annotations.*; import java.io.*; import java.util.*; -import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/IgniteHadoopSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/IgniteHadoopSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/IgniteHadoopSecondaryFileSystem.java deleted file mode 100644 index 007172a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/IgniteHadoopSecondaryFileSystem.java +++ /dev/null @@ -1,413 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite_new.hadoop.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.permission.*; -import org.apache.hadoop.ipc.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.igfs.hadoop.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite_new.filesystem.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -/** - * Adapter to use any Hadoop file system {@link org.apache.hadoop.fs.FileSystem} as {@link org.apache.ignite.igfs.Igfs}. - */ -public class IgniteHadoopSecondaryFileSystem implements SecondaryFileSystem, AutoCloseable { - /** Property name for path to Hadoop configuration. */ - public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH"; - - /** Property name for URI of file system. */ - public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI"; - - /** Hadoop file system. */ - private final FileSystem fileSys; - - /** Properties of file system */ - private final Map props = new HashMap<>(); - - /** - * Constructor. - * - * @param uri URI of file system. - * @param cfgPath Additional path to Hadoop configuration. - * @throws org.apache.ignite.IgniteCheckedException In case of error. - */ - public IgniteHadoopSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath) throws IgniteCheckedException { - Configuration cfg = new Configuration(); - - if (cfgPath != null) - cfg.addResource(U.resolveIgniteUrl(cfgPath)); - - try { - fileSys = uri == null ? FileSystem.get(cfg) : FileSystem.get(new URI(uri), cfg); - } - catch (IOException | URISyntaxException e) { - throw new IgniteCheckedException(e); - } - - uri = fileSys.getUri().toString(); - - if (!uri.endsWith("/")) - uri += "/"; - - props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); - props.put(SECONDARY_FS_URI, uri); - } - - /** - * Convert IGFS path into Hadoop path. - * - * @param path IGFS path. - * @return Hadoop path. - */ - private Path convert(IgfsPath path) { - URI uri = fileSys.getUri(); - - return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); - } - - /** - * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. - * - * @param e Exception to check. - * @param detailMsg Detailed error message. - * @return Appropriate exception. - */ - private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { - boolean wrongVer = X.hasCause(e, RemoteException.class) || - (e.getMessage() != null && e.getMessage().contains("Failed on local")); - - IgfsException igfsErr = !wrongVer ? cast(detailMsg, e) : - new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " + - "version.", e); - - return igfsErr; - } - - /** - * Cast IO exception to IGFS exception. - * - * @param e IO exception. - * @return IGFS exception. - */ - public static IgfsException cast(String msg, IOException e) { - if (e instanceof FileNotFoundException) - return new IgfsFileNotFoundException(e); - else if (e instanceof ParentNotDirectoryException) - return new IgfsParentNotDirectoryException(msg, e); - else if (e instanceof PathIsNotEmptyDirectoryException) - return new IgfsDirectoryNotEmptyException(e); - else if (e instanceof PathExistsException) - return new IgfsPathAlreadyExistsException(msg, e); - else - return new IgfsException(msg, e); - } - - /** - * Convert Hadoop FileStatus properties to map. - * - * @param status File status. - * @return IGFS attributes. - */ - private static Map properties(FileStatus status) { - FsPermission perm = status.getPermission(); - - if (perm == null) - perm = FsPermission.getDefault(); - - return F.asMap(PROP_PERMISSION, String.format("%04o", perm.toShort()), PROP_USER_NAME, status.getOwner(), - PROP_GROUP_NAME, status.getGroup()); - } - - /** {@inheritDoc} */ - @Override public boolean exists(IgfsPath path) { - try { - return fileSys.exists(convert(path)); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public IgfsFile update(IgfsPath path, Map props) { - IgfsHadoopFSProperties props0 = new IgfsHadoopFSProperties(props); - - try { - if (props0.userName() != null || props0.groupName() != null) - fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); - - if (props0.permission() != null) - fileSys.setPermission(convert(path), props0.permission()); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); - } - - //Result is not used in case of secondary FS. - return null; - } - - /** {@inheritDoc} */ - @Override public void rename(IgfsPath src, IgfsPath dest) { - // Delegate to the secondary file system. - try { - if (!fileSys.rename(convert(src), convert(dest))) - throw new IgfsException("Failed to rename (secondary file system returned false) " + - "[src=" + src + ", dest=" + dest + ']'); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']'); - } - } - - /** {@inheritDoc} */ - @Override public boolean delete(IgfsPath path, boolean recursive) { - try { - return fileSys.delete(convert(path), recursive); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); - } - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path) { - try { - if (!fileSys.mkdirs(convert(path))) - throw new IgniteException("Failed to make directories [path=" + path + "]"); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path, @Nullable Map props) { - try { - if (!fileSys.mkdirs(convert(path), new IgfsHadoopFSProperties(props).permission())) - throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]"); - } - } - - /** {@inheritDoc} */ - @Override public Collection listPaths(IgfsPath path) { - try { - FileStatus[] statuses = fileSys.listStatus(convert(path)); - - if (statuses == null) - throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); - - Collection res = new ArrayList<>(statuses.length); - - for (FileStatus status : statuses) - res.add(new IgfsPath(path, status.getPath().getName())); - - return res; - } - catch (FileNotFoundException ignored) { - throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Collection listFiles(IgfsPath path) { - try { - FileStatus[] statuses = fileSys.listStatus(convert(path)); - - if (statuses == null) - throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); - - Collection res = new ArrayList<>(statuses.length); - - for (FileStatus status : statuses) { - IgfsFileInfo fsInfo = status.isDirectory() ? new IgfsFileInfo(true, properties(status)) : - new IgfsFileInfo((int)status.getBlockSize(), status.getLen(), null, null, false, - properties(status)); - - res.add(new IgfsFileImpl(new IgfsPath(path, status.getPath().getName()), fsInfo, 1)); - } - - return res; - } - catch (FileNotFoundException ignored) { - throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsReader open(IgfsPath path, int bufSize) { - return new IgfsHadoopReader(fileSys, convert(path), bufSize); - } - - /** {@inheritDoc} */ - @Override public OutputStream create(IgfsPath path, boolean overwrite) { - try { - return fileSys.create(convert(path), overwrite); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); - } - } - - /** {@inheritDoc} */ - @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, - long blockSize, @Nullable Map props) { - IgfsHadoopFSProperties props0 = - new IgfsHadoopFSProperties(props != null ? props : Collections.emptyMap()); - - try { - return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize, - null); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + - ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication + - ", blockSize=" + blockSize + "]"); - } - } - - /** {@inheritDoc} */ - @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, - @Nullable Map props) { - try { - return fileSys.append(convert(path), bufSize); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(final IgfsPath path) { - try { - final FileStatus status = fileSys.getFileStatus(convert(path)); - - if (status == null) - return null; - - final Map props = properties(status); - - return new IgfsFile() { - @Override public IgfsPath path() { - return path; - } - - @Override public boolean isFile() { - return status.isFile(); - } - - @Override public boolean isDirectory() { - return status.isDirectory(); - } - - @Override public int blockSize() { - return (int)status.getBlockSize(); - } - - @Override public long groupBlockSize() { - return status.getBlockSize(); - } - - @Override public long accessTime() { - return status.getAccessTime(); - } - - @Override public long modificationTime() { - return status.getModificationTime(); - } - - @Override public String property(String name) throws IllegalArgumentException { - String val = props.get(name); - - if (val == null) - throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); - - return val; - } - - @Nullable @Override public String property(String name, @Nullable String dfltVal) { - String val = props.get(name); - - return val == null ? dfltVal : val; - } - - @Override public long length() { - return status.getLen(); - } - - /** {@inheritDoc} */ - @Override public Map properties() { - return props; - } - }; - - } - catch (FileNotFoundException ignore) { - return null; - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Override public long usedSpaceSize() { - try { - return fileSys.getContentSummary(new Path(fileSys.getUri())).getSpaceConsumed(); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to get used space size of file system."); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public Map properties() { - return props; - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - try { - fileSys.close(); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -}