ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [47/50] [abbrv] incubator-ignite git commit: # IGNITE-386: Reworked API in Hadoop module.
Date Tue, 03 Mar 2015 11:07:22 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
deleted file mode 100644
index 5475cf4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
+++ /dev/null
@@ -1,1008 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.igfs.hadoop.v2;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.igfs.hadoop.*;
-import org.apache.ignite.internal.igfs.common.*;
-import org.apache.ignite.internal.igfs.hadoop.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.IgniteFs.*;
-import static org.apache.ignite.configuration.IgfsConfiguration.*;
-import static org.apache.ignite.igfs.IgfsMode.*;
-import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*;
-import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*;
-
-/**
- * {@code IGFS} Hadoop 2.x file system driver over file system API. To use
- * {@code IGFS} as Hadoop file system, you should configure this class
- * in Hadoop's {@code core-site.xml} as follows:
- * <pre name="code" class="xml">
- *  &lt;property&gt;
- *      &lt;name&gt;fs.default.name&lt;/name&gt;
- *      &lt;value&gt;igfs://ipc&lt;/value&gt;
- *  &lt;/property&gt;
- *
- *  &lt;property&gt;
- *      &lt;name&gt;fs.igfs.impl&lt;/name&gt;
- *      &lt;value&gt;org.apache.ignite.igfs.hadoop.IgfsHadoopFileSystem&lt;/value&gt;
- *  &lt;/property&gt;
- * </pre>
- * You should also add Ignite JAR and all libraries to Hadoop classpath. To
- * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop
- * distribution:
- * <pre name="code" class="bash">
- * export IGNITE_HOME=/path/to/Ignite/distribution
- * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar
- *
- * for f in $IGNITE_HOME/libs/*.jar; do
- *  export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f;
- * done
- * </pre>
- * <h1 class="header">Data vs Clients Nodes</h1>
- * Hadoop needs to use its FileSystem remotely from client nodes as well as directly on
- * data nodes. Client nodes are responsible for basic file system operations as well as
- * accessing data nodes remotely. Usually, client nodes are started together
- * with {@code job-submitter} or {@code job-scheduler} processes, while data nodes are usually
- * started together with Hadoop {@code task-tracker} processes.
- * <p>
- * For sample client and data node configuration refer to {@code config/hadoop/default-config-client.xml}
- * and {@code config/hadoop/default-config.xml} configuration files in Ignite installation.
- */
-public class IgfsHadoopFileSystem extends AbstractFileSystem implements Closeable {
-    /** Logger. */
-    private static final Log LOG = LogFactory.getLog(IgfsHadoopFileSystem.class);
-
-    /** Ensures that close routine is invoked at most once. */
-    private final AtomicBoolean closeGuard = new AtomicBoolean();
-
-    /** Grid remote client. */
-    private IgfsHadoopWrapper rmtClient;
-
-    /** Working directory. */
-    private IgfsPath workingDir;
-
-    /** URI. */
-    private URI uri;
-
-    /** Authority. */
-    private String uriAuthority;
-
-    /** Client logger. */
-    private IgfsLogger clientLog;
-
-    /** Server block size. */
-    private long grpBlockSize;
-
-    /** Default replication factor. */
-    private short dfltReplication;
-
-    /** Secondary URI string. */
-    private URI secondaryUri;
-
-    /** Mode resolver. */
-    private IgfsModeResolver modeRslvr;
-
-    /** Secondary file system instance. */
-    private AbstractFileSystem secondaryFs;
-
-    /** Whether custom sequential reads before prefetch value is provided. */
-    private boolean seqReadsBeforePrefetchOverride;
-
-    /** Custom-provided sequential reads before prefetch. */
-    private int seqReadsBeforePrefetch;
-
-    /** Flag that controls whether file writes should be colocated on data node. */
-    private boolean colocateFileWrites;
-
-    /** Prefer local writes. */
-    private boolean preferLocFileWrites;
-
-    /**
-     * @param name URI for file system.
-     * @param cfg Configuration.
-     * @throws URISyntaxException if name has invalid syntax.
-     * @throws IOException If initialization failed.
-     */
-    public IgfsHadoopFileSystem(URI name, Configuration cfg) throws URISyntaxException, IOException {
-        super(IgfsHadoopEndpoint.normalize(name), IGFS_SCHEME, false, -1);
-
-        uri = name;
-
-        try {
-            initialize(name, cfg);
-        }
-        catch (IOException e) {
-            // Close client if exception occurred.
-            if (rmtClient != null)
-                rmtClient.close(false);
-
-            throw e;
-        }
-
-        workingDir = new IgfsPath("/user/" + cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
-    }
-
-    /** {@inheritDoc} */
-    @Override public void checkPath(Path path) {
-        URI uri = path.toUri();
-
-        if (uri.isAbsolute()) {
-            if (!F.eq(uri.getScheme(), IGFS_SCHEME))
-                throw new InvalidPathException("Wrong path scheme [expected=" + IGFS_SCHEME + ", actual=" +
-                    uri.getAuthority() + ']');
-
-            if (!F.eq(uri.getAuthority(), uriAuthority))
-                throw new InvalidPathException("Wrong path authority [expected=" + uriAuthority + ", actual=" +
-                    uri.getAuthority() + ']');
-        }
-    }
-
-    /**
-     * Public setter that can be used by direct users of FS or Visor.
-     *
-     * @param colocateFileWrites Whether all ongoing file writes should be colocated.
-     */
-    @SuppressWarnings("UnusedDeclaration")
-    public void colocateFileWrites(boolean colocateFileWrites) {
-        this.colocateFileWrites = colocateFileWrites;
-    }
-
-    /**
-     * Enter busy state.
-     *
-     * @throws IOException If file system is stopped.
-     */
-    private void enterBusy() throws IOException {
-        if (closeGuard.get())
-            throw new IOException("File system is stopped.");
-    }
-
-    /**
-     * Leave busy state.
-     */
-    private void leaveBusy() {
-        // No-op.
-    }
-
-    /**
-     * @param name URI passed to constructor.
-     * @param cfg Configuration passed to constructor.
-     * @throws IOException If initialization failed.
-     */
-    private void initialize(URI name, Configuration cfg) throws IOException {
-        enterBusy();
-
-        try {
-            if (rmtClient != null)
-                throw new IOException("File system is already initialized: " + rmtClient);
-
-            A.notNull(name, "name");
-            A.notNull(cfg, "cfg");
-
-            if (!IGFS_SCHEME.equals(name.getScheme()))
-                throw new IOException("Illegal file system URI [expected=" + IGFS_SCHEME +
-                    "://[name]/[optional_path], actual=" + name + ']');
-
-            uriAuthority = name.getAuthority();
-
-            // Override sequential reads before prefetch if needed.
-            seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
-
-            if (seqReadsBeforePrefetch > 0)
-                seqReadsBeforePrefetchOverride = true;
-
-            // In Ignite replication factor is controlled by data cache affinity.
-            // We use replication factor to force the whole file to be stored on local node.
-            dfltReplication = (short)cfg.getInt("dfs.replication", 3);
-
-            // Get file colocation control flag.
-            colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, uriAuthority, false);
-            preferLocFileWrites = cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false);
-
-            // Get log directory.
-            String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, uriAuthority, DFLT_IGFS_LOG_DIR);
-
-            File logDirFile = U.resolveIgnitePath(logDirCfg);
-
-            String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
-
-            rmtClient = new IgfsHadoopWrapper(uriAuthority, logDir, cfg, LOG);
-
-            // Handshake.
-            IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
-
-            grpBlockSize = handshake.blockSize();
-
-            IgfsPaths paths = handshake.secondaryPaths();
-
-            Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false);
-
-            if (handshake.sampling() != null ? handshake.sampling() : logEnabled) {
-                // Initiate client logger.
-                if (logDir == null)
-                    throw new IOException("Failed to resolve log directory: " + logDirCfg);
-
-                Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE);
-
-                clientLog = IgfsLogger.logger(uriAuthority, handshake.igfsName(), logDir, batchSize);
-            }
-            else
-                clientLog = IgfsLogger.disabledLogger();
-
-            modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes());
-
-            boolean initSecondary = paths.defaultMode() == PROXY;
-
-            if (paths.pathModes() != null) {
-                for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) {
-                    IgfsMode mode = pathMode.getValue();
-
-                    initSecondary |= mode == PROXY;
-                }
-            }
-
-            if (initSecondary) {
-                Map<String, String> props = paths.properties();
-
-                String secUri = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_URI);
-                String secConfPath = props.get(IgfsHadoopFileSystemWrapper.SECONDARY_FS_CONFIG_PATH);
-
-                if (secConfPath == null)
-                    throw new IOException("Failed to connect to the secondary file system because configuration " +
-                            "path is not provided.");
-
-                if (secUri == null)
-                    throw new IOException("Failed to connect to the secondary file system because URI is not " +
-                            "provided.");
-
-                if (secConfPath == null)
-                    throw new IOException("Failed to connect to the secondary file system because configuration " +
-                        "path is not provided.");
-
-                if (secUri == null)
-                    throw new IOException("Failed to connect to the secondary file system because URI is not " +
-                        "provided.");
-
-                try {
-                    secondaryUri = new URI(secUri);
-
-                    URL secondaryCfgUrl = U.resolveIgniteUrl(secConfPath);
-
-                    if (secondaryCfgUrl == null)
-                        throw new IOException("Failed to resolve secondary file system config URL: " + secConfPath);
-
-                    Configuration conf = new Configuration();
-
-                    conf.addResource(secondaryCfgUrl);
-
-                    String prop = String.format("fs.%s.impl.disable.cache", secondaryUri.getScheme());
-
-                    conf.setBoolean(prop, true);
-
-                    secondaryFs = AbstractFileSystem.get(secondaryUri, conf);
-                }
-                catch (URISyntaxException ignore) {
-                    throw new IOException("Failed to resolve secondary file system URI: " + secUri);
-                }
-                catch (IOException e) {
-                    throw new IOException("Failed to connect to the secondary file system: " + secUri, e);
-                }
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IOException {
-        if (closeGuard.compareAndSet(false, true)) {
-            if (rmtClient == null)
-                return;
-
-            rmtClient.close(false);
-
-            if (clientLog.isLogEnabled())
-                clientLog.close();
-
-            // Reset initialized resources.
-            rmtClient = null;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public URI getUri() {
-        return uri;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getUriDefaultPort() {
-        return -1;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FsServerDefaults getServerDefaults() throws IOException {
-        return new FsServerDefaults(grpBlockSize, (int)grpBlockSize, (int)grpBlockSize, dfltReplication, 64 * 1024,
-            false, 0, DataChecksum.Type.NULL);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean setReplication(Path f, short replication) throws IOException {
-        return mode(f) == PROXY && secondaryFs.setReplication(f, replication);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setTimes(Path f, long mtime, long atime) throws IOException {
-        if (mode(f) == PROXY)
-            secondaryFs.setTimes(f, mtime, atime);
-        else {
-            if (mtime == -1 && atime == -1)
-                return;
-
-            rmtClient.setTimes(convert(f), atime, mtime);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public FsStatus getFsStatus() throws IOException {
-        IgfsStatus status = rmtClient.fsStatus();
-
-        return new FsStatus(status.spaceTotal(), status.spaceUsed(), status.spaceTotal() - status.spaceUsed());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setPermission(Path p, FsPermission perm) throws IOException {
-        enterBusy();
-
-        try {
-            A.notNull(p, "p");
-
-            if (mode(p) == PROXY)
-                secondaryFs.setPermission(toSecondary(p), perm);
-            else {
-                if (rmtClient.update(convert(p), permission(perm)) == null)
-                    throw new IOException("Failed to set file permission (file not found?)" +
-                        " [path=" + p + ", perm=" + perm + ']');
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setOwner(Path p, String usr, String grp) throws IOException {
-        A.notNull(p, "p");
-        A.notNull(usr, "username");
-        A.notNull(grp, "grpName");
-
-        enterBusy();
-
-        try {
-            if (mode(p) == PROXY)
-                secondaryFs.setOwner(toSecondary(p), usr, grp);
-            else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, usr, PROP_GROUP_NAME, grp)) == null)
-                throw new IOException("Failed to set file permission (file not found?)" +
-                    " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']');
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataInputStream open(Path f, int bufSize) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            IgfsPath path = convert(f);
-            IgfsMode mode = modeRslvr.resolveMode(path);
-
-            if (mode == PROXY) {
-                FSDataInputStream is = secondaryFs.open(toSecondary(f), bufSize);
-
-                if (clientLog.isLogEnabled()) {
-                    // At this point we do not know file size, so we perform additional request to remote FS to get it.
-                    FileStatus status = secondaryFs.getFileStatus(toSecondary(f));
-
-                    long size = status != null ? status.getLen() : -1;
-
-                    long logId = IgfsLogger.nextId();
-
-                    clientLog.logOpen(logId, path, PROXY, bufSize, size);
-
-                    return new FSDataInputStream(new IgfsHadoopProxyInputStream(is, clientLog, logId));
-                }
-                else
-                    return is;
-            }
-            else {
-                IgfsHadoopStreamDelegate stream = seqReadsBeforePrefetchOverride ?
-                    rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path);
-
-                long logId = -1;
-
-                if (clientLog.isLogEnabled()) {
-                    logId = IgfsLogger.nextId();
-
-                    clientLog.logOpen(logId, path, mode, bufSize, stream.length());
-                }
-
-                if (LOG.isDebugEnabled())
-                    LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path +
-                        ", bufSize=" + bufSize + ']');
-
-                IgfsHadoopInputStream igfsIn = new IgfsHadoopInputStream(stream, stream.length(),
-                    bufSize, LOG, clientLog, logId);
-
-                if (LOG.isDebugEnabled())
-                    LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']');
-
-                return new FSDataInputStream(igfsIn);
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("deprecation")
-    @Override public FSDataOutputStream createInternal(
-        Path f,
-        EnumSet<CreateFlag> flag,
-        FsPermission perm,
-        int bufSize,
-        short replication,
-        long blockSize,
-        Progressable progress,
-        Options.ChecksumOpt checksumOpt,
-        boolean createParent
-    ) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
-        boolean append = flag.contains(CreateFlag.APPEND);
-        boolean create = flag.contains(CreateFlag.CREATE);
-
-        OutputStream out = null;
-
-        try {
-            IgfsPath path = convert(f);
-            IgfsMode mode = modeRslvr.resolveMode(path);
-
-            if (LOG.isDebugEnabled())
-                LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" +
-                    path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']');
-
-            if (mode == PROXY) {
-                FSDataOutputStream os = secondaryFs.createInternal(toSecondary(f), flag, perm, bufSize,
-                    replication, blockSize, progress, checksumOpt, createParent);
-
-                if (clientLog.isLogEnabled()) {
-                    long logId = IgfsLogger.nextId();
-
-                    if (append)
-                        clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID.
-                    else
-                        clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize);
-
-                    return new FSDataOutputStream(new IgfsHadoopProxyOutputStream(os, clientLog, logId));
-                }
-                else
-                    return os;
-            }
-            else {
-                Map<String, String> permMap = F.asMap(PROP_PERMISSION, toString(perm),
-                    PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites));
-
-                // Create stream and close it in the 'finally' section if any sequential operation failed.
-                IgfsHadoopStreamDelegate stream;
-
-                long logId = -1;
-
-                if (append) {
-                    stream = rmtClient.append(path, create, permMap);
-
-                    if (clientLog.isLogEnabled()) {
-                        logId = IgfsLogger.nextId();
-
-                        clientLog.logAppend(logId, path, mode, bufSize);
-                    }
-
-                    if (LOG.isDebugEnabled())
-                        LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']');
-                }
-                else {
-                    stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize,
-                        permMap);
-
-                    if (clientLog.isLogEnabled()) {
-                        logId = IgfsLogger.nextId();
-
-                        clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize);
-                    }
-
-                    if (LOG.isDebugEnabled())
-                        LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']');
-                }
-
-                assert stream != null;
-
-                IgfsHadoopOutputStream igfsOut = new IgfsHadoopOutputStream(stream, LOG,
-                    clientLog, logId);
-
-                bufSize = Math.max(64 * 1024, bufSize);
-
-                out = new BufferedOutputStream(igfsOut, bufSize);
-
-                FSDataOutputStream res = new FSDataOutputStream(out, null, 0);
-
-                // Mark stream created successfully.
-                out = null;
-
-                return res;
-            }
-        }
-        finally {
-            // Close if failed during stream creation.
-            if (out != null)
-                U.closeQuiet(out);
-
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean supportsSymlinks() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void renameInternal(Path src, Path dst) throws IOException {
-        A.notNull(src, "src");
-        A.notNull(dst, "dst");
-
-        enterBusy();
-
-        try {
-            IgfsPath srcPath = convert(src);
-            IgfsPath dstPath = convert(dst);
-            Set<IgfsMode> childrenModes = modeRslvr.resolveChildrenModes(srcPath);
-
-            if (childrenModes.contains(PROXY)) {
-                if (clientLog.isLogEnabled())
-                    clientLog.logRename(srcPath, PROXY, dstPath);
-
-                secondaryFs.renameInternal(toSecondary(src), toSecondary(dst));
-            }
-
-            rmtClient.rename(srcPath, dstPath);
-
-            if (clientLog.isLogEnabled())
-                clientLog.logRename(srcPath, modeRslvr.resolveMode(srcPath), dstPath);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean delete(Path f, boolean recursive) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            IgfsPath path = convert(f);
-            IgfsMode mode = modeRslvr.resolveMode(path);
-            Set<IgfsMode> childrenModes = modeRslvr.resolveChildrenModes(path);
-
-            if (childrenModes.contains(PROXY)) {
-                if (clientLog.isLogEnabled())
-                    clientLog.logDelete(path, PROXY, recursive);
-
-                return secondaryFs.delete(toSecondary(f), recursive);
-            }
-
-            boolean res = rmtClient.delete(path, recursive);
-
-            if (clientLog.isLogEnabled())
-                clientLog.logDelete(path, mode, recursive);
-
-            return res;
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException {
-        // Checksum has effect for secondary FS only.
-        if (secondaryFs != null)
-            secondaryFs.setVerifyChecksum(verifyChecksum);
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileChecksum getFileChecksum(Path f) throws IOException {
-        if (mode(f) == PROXY)
-            return secondaryFs.getFileChecksum(f);
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus[] listStatus(Path f) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            IgfsPath path = convert(f);
-            IgfsMode mode = modeRslvr.resolveMode(path);
-
-            if (mode == PROXY) {
-                FileStatus[] arr = secondaryFs.listStatus(toSecondary(f));
-
-                if (arr == null)
-                    throw new FileNotFoundException("File " + f + " does not exist.");
-
-                for (int i = 0; i < arr.length; i++)
-                    arr[i] = toPrimary(arr[i]);
-
-                if (clientLog.isLogEnabled()) {
-                    String[] fileArr = new String[arr.length];
-
-                    for (int i = 0; i < arr.length; i++)
-                        fileArr[i] = arr[i].getPath().toString();
-
-                    clientLog.logListDirectory(path, PROXY, fileArr);
-                }
-
-                return arr;
-            }
-            else {
-                Collection<IgfsFile> list = rmtClient.listFiles(path);
-
-                if (list == null)
-                    throw new FileNotFoundException("File " + f + " does not exist.");
-
-                List<IgfsFile> files = new ArrayList<>(list);
-
-                FileStatus[] arr = new FileStatus[files.size()];
-
-                for (int i = 0; i < arr.length; i++)
-                    arr[i] = convert(files.get(i));
-
-                if (clientLog.isLogEnabled()) {
-                    String[] fileArr = new String[arr.length];
-
-                    for (int i = 0; i < arr.length; i++)
-                        fileArr[i] = arr[i].getPath().toString();
-
-                    clientLog.logListDirectory(path, mode, fileArr);
-                }
-
-                return arr;
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void mkdir(Path f, FsPermission perm, boolean createParent) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            IgfsPath path = convert(f);
-            IgfsMode mode = modeRslvr.resolveMode(path);
-
-            if (mode == PROXY) {
-                if (clientLog.isLogEnabled())
-                    clientLog.logMakeDirectory(path, PROXY);
-
-                secondaryFs.mkdir(toSecondary(f), perm, createParent);
-            }
-            else {
-                rmtClient.mkdirs(path, permission(perm));
-
-                if (clientLog.isLogEnabled())
-                    clientLog.logMakeDirectory(path, mode);
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus getFileStatus(Path f) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            if (mode(f) == PROXY)
-                return toPrimary(secondaryFs.getFileStatus(toSecondary(f)));
-            else {
-                IgfsFile info = rmtClient.info(convert(f));
-
-                if (info == null)
-                    throw new FileNotFoundException("File not found: " + f);
-
-                return convert(info);
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws IOException {
-        A.notNull(path, "path");
-
-        IgfsPath igfsPath = convert(path);
-
-        enterBusy();
-
-        try {
-            if (modeRslvr.resolveMode(igfsPath) == PROXY)
-                return secondaryFs.getFileBlockLocations(path, start, len);
-            else {
-                long now = System.currentTimeMillis();
-
-                List<IgfsBlockLocation> affinity = new ArrayList<>(
-                    rmtClient.affinity(igfsPath, start, len));
-
-                BlockLocation[] arr = new BlockLocation[affinity.size()];
-
-                for (int i = 0; i < arr.length; i++)
-                    arr[i] = convert(affinity.get(i));
-
-                if (LOG.isDebugEnabled())
-                    LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" +
-                        (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']');
-
-                return arr;
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Resolve path mode.
-     *
-     * @param path HDFS path.
-     * @return Path mode.
-     */
-    public IgfsMode mode(Path path) {
-        return modeRslvr.resolveMode(convert(path));
-    }
-
-    /**
-     * Convert the given path to path acceptable by the primary file system.
-     *
-     * @param path Path.
-     * @return Primary file system path.
-     */
-    private Path toPrimary(Path path) {
-        return convertPath(path, getUri());
-    }
-
-    /**
-     * Convert the given path to path acceptable by the secondary file system.
-     *
-     * @param path Path.
-     * @return Secondary file system path.
-     */
-    private Path toSecondary(Path path) {
-        assert secondaryFs != null;
-        assert secondaryUri != null;
-
-        return convertPath(path, secondaryUri);
-    }
-
-    /**
-     * Convert path using the given new URI.
-     *
-     * @param path Old path.
-     * @param newUri New URI.
-     * @return New path.
-     */
-    private Path convertPath(Path path, URI newUri) {
-        assert newUri != null;
-
-        if (path != null) {
-            URI pathUri = path.toUri();
-
-            try {
-                return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null,
-                    pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null));
-            }
-            catch (URISyntaxException e) {
-                throw new IgniteException("Failed to construct secondary file system path from the primary file " +
-                    "system path: " + path, e);
-            }
-        }
-        else
-            return null;
-    }
-
-    /**
-     * Convert a file status obtained from the secondary file system to a status of the primary file system.
-     *
-     * @param status Secondary file system status.
-     * @return Primary file system status.
-     */
-    private FileStatus toPrimary(FileStatus status) {
-        return status != null ? new FileStatus(status.getLen(), status.isDirectory(), status.getReplication(),
-            status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(),
-            status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null;
-    }
-
-    /**
-     * Convert IGFS path into Hadoop path.
-     *
-     * @param path IGFS path.
-     * @return Hadoop path.
-     */
-    private Path convert(IgfsPath path) {
-        return new Path(IGFS_SCHEME, uriAuthority, path.toString());
-    }
-
-    /**
-     * Convert Hadoop path into IGFS path.
-     *
-     * @param path Hadoop path.
-     * @return IGFS path.
-     */
-    @Nullable private IgfsPath convert(Path path) {
-        if (path == null)
-            return null;
-
-        return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) :
-            new IgfsPath(workingDir, path.toUri().getPath());
-    }
-
-    /**
-     * Convert IGFS affinity block location into Hadoop affinity block location.
-     *
-     * @param block IGFS affinity block location.
-     * @return Hadoop affinity block location.
-     */
-    private BlockLocation convert(IgfsBlockLocation block) {
-        Collection<String> names = block.names();
-        Collection<String> hosts = block.hosts();
-
-        return new BlockLocation(
-            names.toArray(new String[names.size()]) /* hostname:portNumber of data nodes */,
-            hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes */,
-            block.start(), block.length()
-        ) {
-            @Override public String toString() {
-                try {
-                    return "BlockLocation [offset=" + getOffset() + ", length=" + getLength() +
-                        ", hosts=" + Arrays.asList(getHosts()) + ", names=" + Arrays.asList(getNames()) + ']';
-                }
-                catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        };
-    }
-
-    /**
-     * Convert IGFS file information into Hadoop file status.
-     *
-     * @param file IGFS file information.
-     * @return Hadoop file status.
-     */
-    private FileStatus convert(IgfsFile file) {
-        return new FileStatus(
-            file.length(),
-            file.isDirectory(),
-            dfltReplication,
-            file.groupBlockSize(),
-            file.modificationTime(),
-            file.accessTime(),
-            permission(file),
-            file.property(PROP_USER_NAME, DFLT_USER_NAME),
-            file.property(PROP_GROUP_NAME, "users"),
-            convert(file.path())) {
-            @Override public String toString() {
-                return "FileStatus [path=" + getPath() + ", isDir=" + isDirectory() + ", len=" + getLen() + "]";
-            }
-        };
-    }
-
-    /**
-     * Convert Hadoop permission into IGFS file attribute.
-     *
-     * @param perm Hadoop permission.
-     * @return IGFS attributes.
-     */
-    private Map<String, String> permission(FsPermission perm) {
-        if (perm == null)
-            perm = FsPermission.getDefault();
-
-        return F.asMap(PROP_PERMISSION, toString(perm));
-    }
-
-    /**
-     * @param perm Permission.
-     * @return String.
-     */
-    private static String toString(FsPermission perm) {
-        return String.format("%04o", perm.toShort());
-    }
-
-    /**
-     * Convert IGFS file attributes into Hadoop permission.
-     *
-     * @param file File info.
-     * @return Hadoop permission.
-     */
-    private FsPermission permission(IgfsFile file) {
-        String perm = file.property(PROP_PERMISSION, null);
-
-        if (perm == null)
-            return FsPermission.getDefault();
-
-        try {
-            return new FsPermission((short)Integer.parseInt(perm, 8));
-        }
-        catch (NumberFormatException ignore) {
-            return FsPermission.getDefault();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgfsHadoopFileSystem.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html
deleted file mode 100644
index 6df66f4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<body>
-    <!-- Package description. -->
-    Contains Hadoop 2.x <code>FileSystem</code> wrapper for Ignite file system.
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html b/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html
deleted file mode 100644
index ec38a21..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<body>
-    <!-- Package description. -->
-    Contains <b>IG</b>nite <b>F</b>ile <b>S</b>ystem APIs.
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java
new file mode 100644
index 0000000..bdf8fc6
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.counters.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Hadoop counter group adapter.
+ */
+class HadoopCounterGroup implements CounterGroup {
+    /** Counters. */
+    private final HadoopCounters cntrs;
+
+    /** Group name. */
+    private final String name;
+
+    /**
+     * Creates new instance.
+     *
+     * @param cntrs Client counters instance.
+     * @param name Group name.
+     */
+    HadoopCounterGroup(HadoopCounters cntrs, String name) {
+        this.cntrs = cntrs;
+        this.name = name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getName() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getDisplayName() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setDisplayName(String displayName) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addCounter(Counter counter) {
+        addCounter(counter.getName(), counter.getDisplayName(), 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter addCounter(String name, String displayName, long value) {
+        final Counter counter = cntrs.findCounter(this.name, name);
+
+        counter.setValue(value);
+
+        return counter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String counterName, String displayName) {
+        return cntrs.findCounter(name, counterName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String counterName, boolean create) {
+        return cntrs.findCounter(name, counterName, create);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String counterName) {
+        return cntrs.findCounter(name, counterName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return cntrs.groupSize(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
+        for (final Counter counter : rightGroup)
+            cntrs.findCounter(name, counter.getName()).increment(counter.getValue());
+    }
+
+    /** {@inheritDoc} */
+    @Override public CounterGroupBase<Counter> getUnderlyingGroup() {
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterator<Counter> iterator() {
+        return cntrs.iterateGroup(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFields(DataInput in) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
new file mode 100644
index 0000000..c7f0157
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.counters.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Hadoop counters adapter.
+ */
+public class HadoopCounters extends Counters {
+    /** */
+    private final Map<T2<String,String>,GridHadoopLongCounter> cntrs = new HashMap<>();
+
+    /**
+     * Creates new instance based on given counters.
+     *
+     * @param cntrs Counters to adapt.
+     */
+    public HadoopCounters(GridHadoopCounters cntrs) {
+        for (GridHadoopCounter cntr : cntrs.all())
+            if (cntr instanceof GridHadoopLongCounter)
+                this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (GridHadoopLongCounter) cntr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized CounterGroup addGroup(CounterGroup grp) {
+        return addGroup(grp.getName(), grp.getDisplayName());
+    }
+
+    /** {@inheritDoc} */
+    @Override public CounterGroup addGroup(String name, String displayName) {
+        return new HadoopCounterGroup(this, name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter findCounter(String grpName, String cntrName) {
+        return findCounter(grpName, cntrName, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized Counter findCounter(Enum<?> key) {
+        return findCounter(key.getDeclaringClass().getName(), key.name(), true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) {
+        return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name());
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized Iterable<String> getGroupNames() {
+        Collection<String> res = new HashSet<>();
+
+        for (GridHadoopCounter counter : cntrs.values())
+            res.add(counter.group());
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterator<CounterGroup> iterator() {
+        final Iterator<String> iter = getGroupNames().iterator();
+
+        return new Iterator<CounterGroup>() {
+            @Override public boolean hasNext() {
+                return iter.hasNext();
+            }
+
+            @Override public CounterGroup next() {
+                if (!hasNext())
+                    throw new NoSuchElementException();
+
+                return new HadoopCounterGroup(HadoopCounters.this, iter.next());
+            }
+
+            @Override public void remove() {
+                throw new UnsupportedOperationException("not implemented");
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized CounterGroup getGroup(String grpName) {
+        return new HadoopCounterGroup(this, grpName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int countCounters() {
+        return cntrs.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void readFields(DataInput in) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) {
+        for (CounterGroup group : other) {
+            for (Counter counter : group) {
+                findCounter(group.getName(), counter.getName()).increment(counter.getValue());
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object genericRight) {
+        if (!(genericRight instanceof HadoopCounters))
+            return false;
+
+        return cntrs.equals(((HadoopCounters) genericRight).cntrs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return cntrs.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setWriteAllCounters(boolean snd) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean getWriteAllCounters() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Limits limits() {
+        return null;
+    }
+
+    /**
+     * Returns size of a group.
+     *
+     * @param grpName Name of the group.
+     * @return amount of counters in the given group.
+     */
+    public int groupSize(String grpName) {
+        int res = 0;
+
+        for (GridHadoopCounter counter : cntrs.values()) {
+            if (grpName.equals(counter.group()))
+                res++;
+        }
+
+        return res;
+    }
+
+    /**
+     * Returns counters iterator for specified group.
+     *
+     * @param grpName Name of the group to iterate.
+     * @return Counters iterator.
+     */
+    public Iterator<Counter> iterateGroup(String grpName) {
+        Collection<Counter> grpCounters = new ArrayList<>();
+
+        for (GridHadoopLongCounter counter : cntrs.values()) {
+            if (grpName.equals(counter.group()))
+                grpCounters.add(new GridHadoopV2Counter(counter));
+        }
+
+        return grpCounters.iterator();
+    }
+
+    /**
+     * Find a counter in the group.
+     *
+     * @param grpName The name of the counter group.
+     * @param cntrName The name of the counter.
+     * @param create Create the counter if not found if true.
+     * @return The counter that was found or added or {@code null} if create is false.
+     */
+    public Counter findCounter(String grpName, String cntrName, boolean create) {
+        T2<String, String> key = new T2<>(grpName, cntrName);
+
+        GridHadoopLongCounter internalCntr = cntrs.get(key);
+
+        if (internalCntr == null & create) {
+            internalCntr = new GridHadoopLongCounter(grpName,cntrName);
+
+            cntrs.put(key, new GridHadoopLongCounter(grpName,cntrName));
+        }
+
+        return internalCntr == null ? null : new GridHadoopV2Counter(internalCntr);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
index 52e7d29..e1bf9b6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.hadoop.fs;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.ignite.igfs.hadoop.v1.*;
+import org.apache.ignite.hadoop.fs.v1.*;
 
 /**
  * Utilities for configuring file systems to support the separate working directory per each thread.
@@ -36,8 +36,8 @@ public class GridHadoopFileSystemsUtils {
      * @param userName User name.
      */
     public static void setUser(FileSystem fs, String userName) {
-        if (fs instanceof IgfsHadoopFileSystem)
-            ((IgfsHadoopFileSystem)fs).setUser(userName);
+        if (fs instanceof IgniteHadoopFileSystem)
+            ((IgniteHadoopFileSystem)fs).setUser(userName);
         else if (fs instanceof GridHadoopDistributedFileSystem)
             ((GridHadoopDistributedFileSystem)fs).setUser(userName);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
new file mode 100644
index 0000000..36fabb7
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+/**
+ * This class lists parameters that can be specified in Hadoop configuration.
+ * Hadoop configuration can be specified in {@code core-site.xml} file
+ * or passed to map-reduce task directly when using Hadoop driver for IGFS file system:
+ * <ul>
+ *     <li>
+ *         {@code fs.igfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides
+ *         the one specified in {@link org.apache.ignite.configuration.IgfsConfiguration#getSequentialReadsBeforePrefetch()}
+ *         IGFS data node configuration property.
+ *     </li>
+ *     <li>
+ *         {@code fs.igfs.[name].log.enabled} - specifies whether IGFS sampling logger is enabled. If
+ *         {@code true}, then all file system operations will be logged to a file.
+ *     </li>
+ *     <li>{@code fs.igfs.[name].log.dir} - specifies log directory where sampling log files should be placed.</li>
+ *     <li>
+ *         {@code fs.igfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before
+ *         it gets flushed to log file. Higher values will imply greater performance, but will increase delay
+ *         before record appears in the log file.
+ *     </li>
+ *     <li>
+ *         {@code fs.igfs.[name].colocated.writes} - specifies whether written files should be colocated on data
+ *         node to which client is connected. If {@code true}, file will not be distributed and will be written
+ *         to a single data node. Default value is {@code true}.
+ *     </li>
+ *     <li>
+ *         {@code fs.igfs.prefer.local.writes} - specifies whether file preferably should be written to
+ *         local data node if it has enough free space. After some time it can be redistributed across nodes though.
+ *     </li>
+ * </ul>
+ * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in
+ * case your file system URI is {@code igfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}.
+ * <p>
+ * Sample configuration that can be placed to {@code core-site.xml} file:
+ * <pre name="code" class="xml">
+ *     &lt;property&gt;
+ *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.enabled&lt;/name&gt;
+ *         &lt;value&gt;true&lt;/value&gt;
+ *     &lt;/property&gt;
+ *     &lt;property&gt;
+ *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.dir&lt;/name&gt;
+ *         &lt;value&gt;/home/apache/ignite/log/sampling&lt;/value&gt;
+ *     &lt;/property&gt;
+ *     &lt;property&gt;
+ *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.batch_size&lt;/name&gt;
+ *         &lt;value&gt;16&lt;/value&gt;
+ *     &lt;/property&gt;
+ * </pre>
+ * Parameters could also be specified per mapreduce job, e.g.
+ * <pre name="code" class="bash">
+ * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.igfs.open.sequential_reads_before_prefetch=4
+ * </pre>
+ * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest
+ * way to do that is {@code String.format(PARAM_IGFS_COLOCATED_WRITES, [name])}.
+ */
+public class HadoopParameters {
+    /** Parameter name for control over file colocation write mode. */
+    public static final String PARAM_IGFS_COLOCATED_WRITES = "fs.igfs.%s.colocated.writes";
+
+    /** Parameter name for custom sequential reads before prefetch value. */
+    public static final String PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH =
+        "fs.igfs.%s.open.sequential_reads_before_prefetch";
+
+    /** Parameter name for client logger directory. */
+    public static final String PARAM_IGFS_LOG_DIR = "fs.igfs.%s.log.dir";
+
+    /** Parameter name for log batch size. */
+    public static final String PARAM_IGFS_LOG_BATCH_SIZE = "fs.igfs.%s.log.batch_size";
+
+    /** Parameter name for log enabled flag. */
+    public static final String PARAM_IGFS_LOG_ENABLED = "fs.igfs.%s.log.enabled";
+
+    /** Parameter name for prefer local writes flag. */
+    public static final String PARAM_IGFS_PREFER_LOCAL_WRITES = "fs.igfs.prefer.local.writes";
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
new file mode 100644
index 0000000..4c83ace
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.protocol.*;
+import org.apache.hadoop.mapreduce.security.token.delegation.*;
+import org.apache.hadoop.mapreduce.v2.*;
+import org.apache.hadoop.mapreduce.v2.jobhistory.*;
+import org.apache.hadoop.security.*;
+import org.apache.hadoop.security.authorize.*;
+import org.apache.hadoop.security.token.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.client.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
+
+/**
+ * Hadoop client protocol.
+ */
+public class HadoopClientProtocol implements ClientProtocol {
+    /** Ignite framework name property. */
+    public static final String FRAMEWORK_NAME = "ignite";
+
+    /** Protocol version. */
+    private static final long PROTO_VER = 1L;
+
+    /** Default Ignite system directory. */
+    private static final String SYS_DIR = ".ignite/system";
+
+    /** Configuration. */
+    private final Configuration conf;
+
+    /** Ignite client. */
+    private volatile GridClient cli;
+
+    /** Last received version. */
+    private long lastVer = -1;
+
+    /** Last received status. */
+    private GridHadoopJobStatus lastStatus;
+
+    /**
+     * Constructor.
+     *
+     * @param conf Configuration.
+     * @param cli Ignite client.
+     */
+    public HadoopClientProtocol(Configuration conf, GridClient cli) {
+        assert cli != null;
+
+        this.conf = conf;
+        this.cli = cli;
+    }
+
+    /** {@inheritDoc} */
+    @Override public JobID getNewJobID() throws IOException, InterruptedException {
+        try {
+            conf.setLong(REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
+
+            GridHadoopJobId jobID = cli.compute().execute(GridHadoopProtocolNextTaskIdTask.class.getName(), null);
+
+            conf.setLong(RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
+
+            return new JobID(jobID.globalId().toString(), jobID.localId());
+        }
+        catch (GridClientException e) {
+            throw new IOException("Failed to get new job ID.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException,
+        InterruptedException {
+        try {
+            conf.setLong(JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis());
+
+            GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolSubmitJobTask.class.getName(),
+                new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf)));
+
+            if (status == null)
+                throw new IOException("Failed to submit job (null status obtained): " + jobId);
+
+            return processStatus(status);
+        }
+        catch (GridClientException | IgniteCheckedException e) {
+            throw new IOException("Failed to submit job.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException {
+        return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException {
+        return Cluster.JobTrackerStatus.RUNNING;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public AccessControlList getQueueAdmins(String queueName) throws IOException {
+        return new AccessControlList("*");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void killJob(JobID jobId) throws IOException, InterruptedException {
+        try {
+            cli.compute().execute(GridHadoopProtocolKillJobTask.class.getName(),
+                new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
+        }
+        catch (GridClientException e) {
+            throw new IOException("Failed to kill job: " + jobId, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException,
+        InterruptedException {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public JobStatus getJobStatus(JobID jobId) throws IOException, InterruptedException {
+        try {
+            Long delay = conf.getLong(GridHadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1);
+
+            GridHadoopProtocolTaskArguments args = delay >= 0 ?
+                new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) :
+                new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId());
+
+            GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolJobStatusTask.class.getName(), args);
+
+            if (status == null)
+                throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
+
+            return processStatus(status);
+        }
+        catch (GridClientException e) {
+            throw new IOException("Failed to get job status: " + jobId, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException {
+        try {
+            final GridHadoopCounters counters = cli.compute().execute(GridHadoopProtocolJobCountersTask.class.getName(),
+                new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
+
+            if (counters == null)
+                throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
+
+            return new HadoopCounters(counters);
+        }
+        catch (GridClientException e) {
+            throw new IOException("Failed to get job counters: " + jobId, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException {
+        return new TaskReport[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getFilesystemName() throws IOException, InterruptedException {
+        return FileSystem.get(conf).getUri().toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+        return new JobStatus[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents)
+        throws IOException, InterruptedException {
+        return new TaskCompletionEvent[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException {
+        return new String[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException {
+        return new TaskTrackerInfo[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException {
+        return new TaskTrackerInfo[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getSystemDir() throws IOException, InterruptedException {
+        Path sysDir = new Path(SYS_DIR);
+
+        return sysDir.toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getStagingAreaDir() throws IOException, InterruptedException {
+        String usr = UserGroupInformation.getCurrentUser().getShortUserName();
+
+        return GridHadoopUtils.stagingAreaDir(conf, usr).toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getJobHistoryDir() throws IOException, InterruptedException {
+        return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public QueueInfo[] getQueues() throws IOException, InterruptedException {
+        return new QueueInfo[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public QueueInfo getQueue(String queueName) throws IOException, InterruptedException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException {
+        return new QueueAclsInfo[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+        return new QueueInfo[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException {
+        return new QueueInfo[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException,
+        InterruptedException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException,
+        InterruptedException {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException,
+        InterruptedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException,
+        InterruptedException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+        return PROTO_VER;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
+        throws IOException {
+        return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash);
+    }
+
+    /**
+     * Process received status update.
+     *
+     * @param status Ignite status.
+     * @return Hadoop status.
+     */
+    private JobStatus processStatus(GridHadoopJobStatus status) {
+        // IMPORTANT! This method will only work in single-threaded environment. It is valid at the moment because
+        // IgniteHadoopClientProtocolProvider creates new instance of this class for every new job and Job class
+        // serializes invocations of submitJob() and getJobStatus() methods. However, if any of these conditions will
+        // change in future and either protocol will serve statuses for several jobs or status update will not be
+        // serialized anymore, then we have to fallback to concurrent approach (e.g. using ConcurrentHashMap).
+        // (vozerov)
+        if (lastVer < status.version()) {
+            lastVer = status.version();
+
+            lastStatus = status;
+        }
+        else
+            assert lastStatus != null;
+
+        return GridHadoopUtils.status(lastStatus, conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
index 41bd24a..82be91f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
@@ -39,7 +39,7 @@ import org.jetbrains.annotations.*;
 import java.io.*;
 import java.util.*;
 
-import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*;
+import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
 import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d46deb2/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/IgniteHadoopSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/IgniteHadoopSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/IgniteHadoopSecondaryFileSystem.java
deleted file mode 100644
index 007172a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite_new/hadoop/fs/IgniteHadoopSecondaryFileSystem.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite_new.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.igfs.hadoop.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite_new.filesystem.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Adapter to use any Hadoop file system {@link org.apache.hadoop.fs.FileSystem} as {@link org.apache.ignite.igfs.Igfs}.
- */
-public class IgniteHadoopSecondaryFileSystem implements SecondaryFileSystem, AutoCloseable {
-    /** Property name for path to Hadoop configuration. */
-    public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH";
-
-    /** Property name for URI of file system. */
-    public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
-
-    /** Hadoop file system. */
-    private final FileSystem fileSys;
-
-    /** Properties of file system */
-    private final Map<String, String> props = new HashMap<>();
-
-    /**
-     * Constructor.
-     *
-     * @param uri URI of file system.
-     * @param cfgPath Additional path to Hadoop configuration.
-     * @throws org.apache.ignite.IgniteCheckedException In case of error.
-     */
-    public IgniteHadoopSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath) throws IgniteCheckedException {
-        Configuration cfg = new Configuration();
-
-        if (cfgPath != null)
-            cfg.addResource(U.resolveIgniteUrl(cfgPath));
-
-        try {
-            fileSys = uri == null ? FileSystem.get(cfg) : FileSystem.get(new URI(uri), cfg);
-        }
-        catch (IOException | URISyntaxException e) {
-            throw new IgniteCheckedException(e);
-        }
-
-        uri = fileSys.getUri().toString();
-
-        if (!uri.endsWith("/"))
-            uri += "/";
-
-        props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
-        props.put(SECONDARY_FS_URI, uri);
-    }
-
-    /**
-     * Convert IGFS path into Hadoop path.
-     *
-     * @param path IGFS path.
-     * @return Hadoop path.
-     */
-    private Path convert(IgfsPath path) {
-        URI uri = fileSys.getUri();
-
-        return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
-    }
-
-    /**
-     * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
-     *
-     * @param e Exception to check.
-     * @param detailMsg Detailed error message.
-     * @return Appropriate exception.
-     */
-    private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
-        boolean wrongVer = X.hasCause(e, RemoteException.class) ||
-            (e.getMessage() != null && e.getMessage().contains("Failed on local"));
-
-        IgfsException igfsErr = !wrongVer ? cast(detailMsg, e) :
-            new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " +
-                "version.", e);
-
-        return igfsErr;
-    }
-
-    /**
-     * Cast IO exception to IGFS exception.
-     *
-     * @param e IO exception.
-     * @return IGFS exception.
-     */
-    public static IgfsException cast(String msg, IOException e) {
-        if (e instanceof FileNotFoundException)
-            return new IgfsFileNotFoundException(e);
-        else if (e instanceof ParentNotDirectoryException)
-            return new IgfsParentNotDirectoryException(msg, e);
-        else if (e instanceof PathIsNotEmptyDirectoryException)
-            return new IgfsDirectoryNotEmptyException(e);
-        else if (e instanceof PathExistsException)
-            return new IgfsPathAlreadyExistsException(msg, e);
-        else
-            return new IgfsException(msg, e);
-    }
-
-    /**
-     * Convert Hadoop FileStatus properties to map.
-     *
-     * @param status File status.
-     * @return IGFS attributes.
-     */
-    private static Map<String, String> properties(FileStatus status) {
-        FsPermission perm = status.getPermission();
-
-        if (perm == null)
-            perm = FsPermission.getDefault();
-
-        return F.asMap(PROP_PERMISSION, String.format("%04o", perm.toShort()), PROP_USER_NAME, status.getOwner(),
-            PROP_GROUP_NAME, status.getGroup());
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean exists(IgfsPath path) {
-        try {
-            return fileSys.exists(convert(path));
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
-        IgfsHadoopFSProperties props0 = new IgfsHadoopFSProperties(props);
-
-        try {
-            if (props0.userName() != null || props0.groupName() != null)
-                fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
-
-            if (props0.permission() != null)
-                fileSys.setPermission(convert(path), props0.permission());
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]");
-        }
-
-        //Result is not used in case of secondary FS.
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void rename(IgfsPath src, IgfsPath dest) {
-        // Delegate to the secondary file system.
-        try {
-            if (!fileSys.rename(convert(src), convert(dest)))
-                throw new IgfsException("Failed to rename (secondary file system returned false) " +
-                    "[src=" + src + ", dest=" + dest + ']');
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']');
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean delete(IgfsPath path, boolean recursive) {
-        try {
-            return fileSys.delete(convert(path), recursive);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void mkdirs(IgfsPath path) {
-        try {
-            if (!fileSys.mkdirs(convert(path)))
-                throw new IgniteException("Failed to make directories [path=" + path + "]");
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
-        try {
-            if (!fileSys.mkdirs(convert(path), new IgfsHadoopFSProperties(props).permission()))
-                throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
-        try {
-            FileStatus[] statuses = fileSys.listStatus(convert(path));
-
-            if (statuses == null)
-                throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
-
-            Collection<IgfsPath> res = new ArrayList<>(statuses.length);
-
-            for (FileStatus status : statuses)
-                res.add(new IgfsPath(path, status.getPath().getName()));
-
-            return res;
-        }
-        catch (FileNotFoundException ignored) {
-            throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
-        try {
-            FileStatus[] statuses = fileSys.listStatus(convert(path));
-
-            if (statuses == null)
-                throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
-
-            Collection<IgfsFile> res = new ArrayList<>(statuses.length);
-
-            for (FileStatus status : statuses) {
-                IgfsFileInfo fsInfo = status.isDirectory() ? new IgfsFileInfo(true, properties(status)) :
-                    new IgfsFileInfo((int)status.getBlockSize(), status.getLen(), null, null, false,
-                    properties(status));
-
-                res.add(new IgfsFileImpl(new IgfsPath(path, status.getPath().getName()), fsInfo, 1));
-            }
-
-            return res;
-        }
-        catch (FileNotFoundException ignored) {
-            throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsReader open(IgfsPath path, int bufSize) {
-        return new IgfsHadoopReader(fileSys, convert(path), bufSize);
-    }
-
-    /** {@inheritDoc} */
-    @Override public OutputStream create(IgfsPath path, boolean overwrite) {
-        try {
-            return fileSys.create(convert(path), overwrite);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
-        long blockSize, @Nullable Map<String, String> props) {
-        IgfsHadoopFSProperties props0 =
-            new IgfsHadoopFSProperties(props != null ? props : Collections.<String, String>emptyMap());
-
-        try {
-            return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize,
-                null);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
-                ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication +
-                ", blockSize=" + blockSize + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
-        @Nullable Map<String, String> props) {
-        try {
-            return fileSys.append(convert(path), bufSize);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile info(final IgfsPath path) {
-        try {
-            final FileStatus status = fileSys.getFileStatus(convert(path));
-
-            if (status == null)
-                return null;
-
-            final Map<String, String> props = properties(status);
-
-            return new IgfsFile() {
-                @Override public IgfsPath path() {
-                    return path;
-                }
-
-                @Override public boolean isFile() {
-                    return status.isFile();
-                }
-
-                @Override public boolean isDirectory() {
-                    return status.isDirectory();
-                }
-
-                @Override public int blockSize() {
-                    return (int)status.getBlockSize();
-                }
-
-                @Override public long groupBlockSize() {
-                    return status.getBlockSize();
-                }
-
-                @Override public long accessTime() {
-                    return status.getAccessTime();
-                }
-
-                @Override public long modificationTime() {
-                    return status.getModificationTime();
-                }
-
-                @Override public String property(String name) throws IllegalArgumentException {
-                    String val = props.get(name);
-
-                    if (val ==  null)
-                        throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']');
-
-                    return val;
-                }
-
-                @Nullable @Override public String property(String name, @Nullable String dfltVal) {
-                    String val = props.get(name);
-
-                    return val == null ? dfltVal : val;
-                }
-
-                @Override public long length() {
-                    return status.getLen();
-                }
-
-                /** {@inheritDoc} */
-                @Override public Map<String, String> properties() {
-                    return props;
-                }
-            };
-
-        }
-        catch (FileNotFoundException ignore) {
-            return null;
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public long usedSpaceSize() {
-        try {
-            return fileSys.getContentSummary(new Path(fileSys.getUri())).getSpaceConsumed();
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public Map<String, String> properties() {
-        return props;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IgniteCheckedException {
-        try {
-            fileSys.close();
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-}


Mime
View raw message