ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [36/65] [abbrv] incubator-ignite git commit: IGNITE-386: Squashed changes.
Date Thu, 05 Mar 2015 19:08:48 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
deleted file mode 100644
index 0759203..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java
+++ /dev/null
@@ -1,982 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.igfs.hadoop.v2;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.igfs.common.*;
-import org.apache.ignite.internal.igfs.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.IgniteFs.*;
-import static org.apache.ignite.configuration.IgfsConfiguration.*;
-import static org.apache.ignite.igfs.IgfsMode.*;
-import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*;
-import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.*;
-
-/**
- * {@code IGFS} Hadoop 2.x file system driver over file system API. To use
- * {@code IGFS} as Hadoop file system, you should configure this class
- * in Hadoop's {@code core-site.xml} as follows:
- * <pre name="code" class="xml">
- *  &lt;property&gt;
- *      &lt;name&gt;fs.default.name&lt;/name&gt;
- *      &lt;value&gt;igfs://ipc&lt;/value&gt;
- *  &lt;/property&gt;
- *
- *  &lt;property&gt;
- *      &lt;name&gt;fs.igfs.impl&lt;/name&gt;
- *      &lt;value&gt;org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem&lt;/value&gt;
- *  &lt;/property&gt;
- * </pre>
- * You should also add Ignite JAR and all libraries to Hadoop classpath. To
- * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop
- * distribution:
- * <pre name="code" class="bash">
- * export IGNITE_HOME=/path/to/Ignite/distribution
- * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar
- *
- * for f in $IGNITE_HOME/libs/*.jar; do
- *  export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f;
- * done
- * </pre>
- * <h1 class="header">Data vs Clients Nodes</h1>
- * Hadoop needs to use its FileSystem remotely from client nodes as well as directly on
- * data nodes. Client nodes are responsible for basic file system operations as well as
- * accessing data nodes remotely. Usually, client nodes are started together
- * with {@code job-submitter} or {@code job-scheduler} processes, while data nodes are usually
- * started together with Hadoop {@code task-tracker} processes.
- * <p>
- * For sample client and data node configuration refer to {@code config/hadoop/default-config-client.xml}
- * and {@code config/hadoop/default-config.xml} configuration files in Ignite installation.
- */
-public class IgfsHadoopFileSystem extends AbstractFileSystem implements Closeable {
-    /** Logger. */
-    private static final Log LOG = LogFactory.getLog(IgfsHadoopFileSystem.class);
-
-    /** Ensures that close routine is invoked at most once. */
-    private final AtomicBoolean closeGuard = new AtomicBoolean();
-
-    /** Grid remote client. */
-    private IgfsHadoopWrapper rmtClient;
-
-    /** Working directory. */
-    private IgfsPath workingDir;
-
-    /** URI. */
-    private URI uri;
-
-    /** Authority. */
-    private String uriAuthority;
-
-    /** Client logger. */
-    private IgfsLogger clientLog;
-
-    /** Server block size. */
-    private long grpBlockSize;
-
-    /** Default replication factor. */
-    private short dfltReplication;
-
-    /** Secondary URI string. */
-    private URI secondaryUri;
-
-    /** Mode resolver. */
-    private IgfsModeResolver modeRslvr;
-
-    /** Secondary file system instance. */
-    private AbstractFileSystem secondaryFs;
-
-    /** Whether custom sequential reads before prefetch value is provided. */
-    private boolean seqReadsBeforePrefetchOverride;
-
-    /** Custom-provided sequential reads before prefetch. */
-    private int seqReadsBeforePrefetch;
-
-    /** Flag that controls whether file writes should be colocated on data node. */
-    private boolean colocateFileWrites;
-
-    /** Prefer local writes. */
-    private boolean preferLocFileWrites;
-
-    /**
-     * @param name URI for file system.
-     * @param cfg Configuration.
-     * @throws URISyntaxException if name has invalid syntax.
-     * @throws IOException If initialization failed.
-     */
-    public IgfsHadoopFileSystem(URI name, Configuration cfg) throws URISyntaxException, IOException {
-        super(IgfsHadoopEndpoint.normalize(name), IGFS_SCHEME, false, -1);
-
-        uri = name;
-
-        try {
-            initialize(name, cfg);
-        }
-        catch (IOException e) {
-            // Close client if exception occurred.
-            if (rmtClient != null)
-                rmtClient.close(false);
-
-            throw e;
-        }
-
-        workingDir = new IgfsPath("/user/" + cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
-    }
-
-    /** {@inheritDoc} */
-    @Override public void checkPath(Path path) {
-        URI uri = path.toUri();
-
-        if (uri.isAbsolute()) {
-            if (!F.eq(uri.getScheme(), IGFS_SCHEME))
-                throw new InvalidPathException("Wrong path scheme [expected=" + IGFS_SCHEME + ", actual=" +
-                    uri.getAuthority() + ']');
-
-            if (!F.eq(uri.getAuthority(), uriAuthority))
-                throw new InvalidPathException("Wrong path authority [expected=" + uriAuthority + ", actual=" +
-                    uri.getAuthority() + ']');
-        }
-    }
-
-    /**
-     * Public setter that can be used by direct users of FS or Visor.
-     *
-     * @param colocateFileWrites Whether all ongoing file writes should be colocated.
-     */
-    @SuppressWarnings("UnusedDeclaration")
-    public void colocateFileWrites(boolean colocateFileWrites) {
-        this.colocateFileWrites = colocateFileWrites;
-    }
-
-    /**
-     * Enter busy state.
-     *
-     * @throws IOException If file system is stopped.
-     */
-    private void enterBusy() throws IOException {
-        if (closeGuard.get())
-            throw new IOException("File system is stopped.");
-    }
-
-    /**
-     * Leave busy state.
-     */
-    private void leaveBusy() {
-        // No-op.
-    }
-
-    /**
-     * @param name URI passed to constructor.
-     * @param cfg Configuration passed to constructor.
-     * @throws IOException If initialization failed.
-     */
-    private void initialize(URI name, Configuration cfg) throws IOException {
-        enterBusy();
-
-        try {
-            if (rmtClient != null)
-                throw new IOException("File system is already initialized: " + rmtClient);
-
-            A.notNull(name, "name");
-            A.notNull(cfg, "cfg");
-
-            if (!IGFS_SCHEME.equals(name.getScheme()))
-                throw new IOException("Illegal file system URI [expected=" + IGFS_SCHEME +
-                    "://[name]/[optional_path], actual=" + name + ']');
-
-            uriAuthority = name.getAuthority();
-
-            // Override sequential reads before prefetch if needed.
-            seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
-
-            if (seqReadsBeforePrefetch > 0)
-                seqReadsBeforePrefetchOverride = true;
-
-            // In Ignite replication factor is controlled by data cache affinity.
-            // We use replication factor to force the whole file to be stored on local node.
-            dfltReplication = (short)cfg.getInt("dfs.replication", 3);
-
-            // Get file colocation control flag.
-            colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, uriAuthority, false);
-            preferLocFileWrites = cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false);
-
-            // Get log directory.
-            String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, uriAuthority, DFLT_IGFS_LOG_DIR);
-
-            File logDirFile = U.resolveIgnitePath(logDirCfg);
-
-            String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
-
-            rmtClient = new IgfsHadoopWrapper(uriAuthority, logDir, cfg, LOG);
-
-            // Handshake.
-            IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
-
-            grpBlockSize = handshake.blockSize();
-
-            IgfsPaths paths = handshake.secondaryPaths();
-
-            Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false);
-
-            if (handshake.sampling() != null ? handshake.sampling() : logEnabled) {
-                // Initiate client logger.
-                if (logDir == null)
-                    throw new IOException("Failed to resolve log directory: " + logDirCfg);
-
-                Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE);
-
-                clientLog = IgfsLogger.logger(uriAuthority, handshake.igfsName(), logDir, batchSize);
-            }
-            else
-                clientLog = IgfsLogger.disabledLogger();
-
-            modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes());
-
-            boolean initSecondary = paths.defaultMode() == PROXY;
-
-            if (!initSecondary && paths.pathModes() != null) {
-                for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) {
-                    IgfsMode mode = pathMode.getValue();
-
-                    if (mode == PROXY) {
-                        initSecondary = true;
-
-                        break;
-                    }
-                }
-            }
-
-            if (initSecondary) {
-                Map<String, String> props = paths.properties();
-
-                String secUri = props.get(SECONDARY_FS_URI);
-                String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
-
-                try {
-                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
-
-                    secondaryFs = secProvider.createAbstractFileSystem();
-                    secondaryUri = secProvider.uri();
-                }
-                catch (IOException e) {
-                    throw new IOException("Failed to connect to the secondary file system: " + secUri, e);
-                }
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IOException {
-        if (closeGuard.compareAndSet(false, true)) {
-            if (rmtClient == null)
-                return;
-
-            rmtClient.close(false);
-
-            if (clientLog.isLogEnabled())
-                clientLog.close();
-
-            // Reset initialized resources.
-            rmtClient = null;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public URI getUri() {
-        return uri;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getUriDefaultPort() {
-        return -1;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FsServerDefaults getServerDefaults() throws IOException {
-        return new FsServerDefaults(grpBlockSize, (int)grpBlockSize, (int)grpBlockSize, dfltReplication, 64 * 1024,
-            false, 0, DataChecksum.Type.NULL);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean setReplication(Path f, short replication) throws IOException {
-        return mode(f) == PROXY && secondaryFs.setReplication(f, replication);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setTimes(Path f, long mtime, long atime) throws IOException {
-        if (mode(f) == PROXY)
-            secondaryFs.setTimes(f, mtime, atime);
-        else {
-            if (mtime == -1 && atime == -1)
-                return;
-
-            rmtClient.setTimes(convert(f), atime, mtime);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public FsStatus getFsStatus() throws IOException {
-        IgfsStatus status = rmtClient.fsStatus();
-
-        return new FsStatus(status.spaceTotal(), status.spaceUsed(), status.spaceTotal() - status.spaceUsed());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setPermission(Path p, FsPermission perm) throws IOException {
-        enterBusy();
-
-        try {
-            A.notNull(p, "p");
-
-            if (mode(p) == PROXY)
-                secondaryFs.setPermission(toSecondary(p), perm);
-            else {
-                if (rmtClient.update(convert(p), permission(perm)) == null)
-                    throw new IOException("Failed to set file permission (file not found?)" +
-                        " [path=" + p + ", perm=" + perm + ']');
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setOwner(Path p, String usr, String grp) throws IOException {
-        A.notNull(p, "p");
-        A.notNull(usr, "username");
-        A.notNull(grp, "grpName");
-
-        enterBusy();
-
-        try {
-            if (mode(p) == PROXY)
-                secondaryFs.setOwner(toSecondary(p), usr, grp);
-            else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, usr, PROP_GROUP_NAME, grp)) == null)
-                throw new IOException("Failed to set file permission (file not found?)" +
-                    " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']');
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataInputStream open(Path f, int bufSize) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            IgfsPath path = convert(f);
-            IgfsMode mode = modeRslvr.resolveMode(path);
-
-            if (mode == PROXY) {
-                FSDataInputStream is = secondaryFs.open(toSecondary(f), bufSize);
-
-                if (clientLog.isLogEnabled()) {
-                    // At this point we do not know file size, so we perform additional request to remote FS to get it.
-                    FileStatus status = secondaryFs.getFileStatus(toSecondary(f));
-
-                    long size = status != null ? status.getLen() : -1;
-
-                    long logId = IgfsLogger.nextId();
-
-                    clientLog.logOpen(logId, path, PROXY, bufSize, size);
-
-                    return new FSDataInputStream(new IgfsHadoopProxyInputStream(is, clientLog, logId));
-                }
-                else
-                    return is;
-            }
-            else {
-                IgfsHadoopStreamDelegate stream = seqReadsBeforePrefetchOverride ?
-                    rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path);
-
-                long logId = -1;
-
-                if (clientLog.isLogEnabled()) {
-                    logId = IgfsLogger.nextId();
-
-                    clientLog.logOpen(logId, path, mode, bufSize, stream.length());
-                }
-
-                if (LOG.isDebugEnabled())
-                    LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path +
-                        ", bufSize=" + bufSize + ']');
-
-                IgfsHadoopInputStream igfsIn = new IgfsHadoopInputStream(stream, stream.length(),
-                    bufSize, LOG, clientLog, logId);
-
-                if (LOG.isDebugEnabled())
-                    LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']');
-
-                return new FSDataInputStream(igfsIn);
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("deprecation")
-    @Override public FSDataOutputStream createInternal(
-        Path f,
-        EnumSet<CreateFlag> flag,
-        FsPermission perm,
-        int bufSize,
-        short replication,
-        long blockSize,
-        Progressable progress,
-        Options.ChecksumOpt checksumOpt,
-        boolean createParent
-    ) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
-        boolean append = flag.contains(CreateFlag.APPEND);
-        boolean create = flag.contains(CreateFlag.CREATE);
-
-        OutputStream out = null;
-
-        try {
-            IgfsPath path = convert(f);
-            IgfsMode mode = modeRslvr.resolveMode(path);
-
-            if (LOG.isDebugEnabled())
-                LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" +
-                    path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']');
-
-            if (mode == PROXY) {
-                FSDataOutputStream os = secondaryFs.createInternal(toSecondary(f), flag, perm, bufSize,
-                    replication, blockSize, progress, checksumOpt, createParent);
-
-                if (clientLog.isLogEnabled()) {
-                    long logId = IgfsLogger.nextId();
-
-                    if (append)
-                        clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID.
-                    else
-                        clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize);
-
-                    return new FSDataOutputStream(new IgfsHadoopProxyOutputStream(os, clientLog, logId));
-                }
-                else
-                    return os;
-            }
-            else {
-                Map<String, String> permMap = F.asMap(PROP_PERMISSION, toString(perm),
-                    PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites));
-
-                // Create stream and close it in the 'finally' section if any sequential operation failed.
-                IgfsHadoopStreamDelegate stream;
-
-                long logId = -1;
-
-                if (append) {
-                    stream = rmtClient.append(path, create, permMap);
-
-                    if (clientLog.isLogEnabled()) {
-                        logId = IgfsLogger.nextId();
-
-                        clientLog.logAppend(logId, path, mode, bufSize);
-                    }
-
-                    if (LOG.isDebugEnabled())
-                        LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']');
-                }
-                else {
-                    stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize,
-                        permMap);
-
-                    if (clientLog.isLogEnabled()) {
-                        logId = IgfsLogger.nextId();
-
-                        clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize);
-                    }
-
-                    if (LOG.isDebugEnabled())
-                        LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']');
-                }
-
-                assert stream != null;
-
-                IgfsHadoopOutputStream igfsOut = new IgfsHadoopOutputStream(stream, LOG,
-                    clientLog, logId);
-
-                bufSize = Math.max(64 * 1024, bufSize);
-
-                out = new BufferedOutputStream(igfsOut, bufSize);
-
-                FSDataOutputStream res = new FSDataOutputStream(out, null, 0);
-
-                // Mark stream created successfully.
-                out = null;
-
-                return res;
-            }
-        }
-        finally {
-            // Close if failed during stream creation.
-            if (out != null)
-                U.closeQuiet(out);
-
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean supportsSymlinks() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void renameInternal(Path src, Path dst) throws IOException {
-        A.notNull(src, "src");
-        A.notNull(dst, "dst");
-
-        enterBusy();
-
-        try {
-            IgfsPath srcPath = convert(src);
-            IgfsPath dstPath = convert(dst);
-            Set<IgfsMode> childrenModes = modeRslvr.resolveChildrenModes(srcPath);
-
-            if (childrenModes.contains(PROXY)) {
-                if (clientLog.isLogEnabled())
-                    clientLog.logRename(srcPath, PROXY, dstPath);
-
-                secondaryFs.renameInternal(toSecondary(src), toSecondary(dst));
-            }
-
-            rmtClient.rename(srcPath, dstPath);
-
-            if (clientLog.isLogEnabled())
-                clientLog.logRename(srcPath, modeRslvr.resolveMode(srcPath), dstPath);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean delete(Path f, boolean recursive) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            IgfsPath path = convert(f);
-            IgfsMode mode = modeRslvr.resolveMode(path);
-            Set<IgfsMode> childrenModes = modeRslvr.resolveChildrenModes(path);
-
-            if (childrenModes.contains(PROXY)) {
-                if (clientLog.isLogEnabled())
-                    clientLog.logDelete(path, PROXY, recursive);
-
-                return secondaryFs.delete(toSecondary(f), recursive);
-            }
-
-            boolean res = rmtClient.delete(path, recursive);
-
-            if (clientLog.isLogEnabled())
-                clientLog.logDelete(path, mode, recursive);
-
-            return res;
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException {
-        // Checksum has effect for secondary FS only.
-        if (secondaryFs != null)
-            secondaryFs.setVerifyChecksum(verifyChecksum);
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileChecksum getFileChecksum(Path f) throws IOException {
-        if (mode(f) == PROXY)
-            return secondaryFs.getFileChecksum(f);
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus[] listStatus(Path f) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            IgfsPath path = convert(f);
-            IgfsMode mode = modeRslvr.resolveMode(path);
-
-            if (mode == PROXY) {
-                FileStatus[] arr = secondaryFs.listStatus(toSecondary(f));
-
-                if (arr == null)
-                    throw new FileNotFoundException("File " + f + " does not exist.");
-
-                for (int i = 0; i < arr.length; i++)
-                    arr[i] = toPrimary(arr[i]);
-
-                if (clientLog.isLogEnabled()) {
-                    String[] fileArr = new String[arr.length];
-
-                    for (int i = 0; i < arr.length; i++)
-                        fileArr[i] = arr[i].getPath().toString();
-
-                    clientLog.logListDirectory(path, PROXY, fileArr);
-                }
-
-                return arr;
-            }
-            else {
-                Collection<IgfsFile> list = rmtClient.listFiles(path);
-
-                if (list == null)
-                    throw new FileNotFoundException("File " + f + " does not exist.");
-
-                List<IgfsFile> files = new ArrayList<>(list);
-
-                FileStatus[] arr = new FileStatus[files.size()];
-
-                for (int i = 0; i < arr.length; i++)
-                    arr[i] = convert(files.get(i));
-
-                if (clientLog.isLogEnabled()) {
-                    String[] fileArr = new String[arr.length];
-
-                    for (int i = 0; i < arr.length; i++)
-                        fileArr[i] = arr[i].getPath().toString();
-
-                    clientLog.logListDirectory(path, mode, fileArr);
-                }
-
-                return arr;
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void mkdir(Path f, FsPermission perm, boolean createParent) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            IgfsPath path = convert(f);
-            IgfsMode mode = modeRslvr.resolveMode(path);
-
-            if (mode == PROXY) {
-                if (clientLog.isLogEnabled())
-                    clientLog.logMakeDirectory(path, PROXY);
-
-                secondaryFs.mkdir(toSecondary(f), perm, createParent);
-            }
-            else {
-                rmtClient.mkdirs(path, permission(perm));
-
-                if (clientLog.isLogEnabled())
-                    clientLog.logMakeDirectory(path, mode);
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus getFileStatus(Path f) throws IOException {
-        A.notNull(f, "f");
-
-        enterBusy();
-
-        try {
-            if (mode(f) == PROXY)
-                return toPrimary(secondaryFs.getFileStatus(toSecondary(f)));
-            else {
-                IgfsFile info = rmtClient.info(convert(f));
-
-                if (info == null)
-                    throw new FileNotFoundException("File not found: " + f);
-
-                return convert(info);
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws IOException {
-        A.notNull(path, "path");
-
-        IgfsPath igfsPath = convert(path);
-
-        enterBusy();
-
-        try {
-            if (modeRslvr.resolveMode(igfsPath) == PROXY)
-                return secondaryFs.getFileBlockLocations(path, start, len);
-            else {
-                long now = System.currentTimeMillis();
-
-                List<IgfsBlockLocation> affinity = new ArrayList<>(
-                    rmtClient.affinity(igfsPath, start, len));
-
-                BlockLocation[] arr = new BlockLocation[affinity.size()];
-
-                for (int i = 0; i < arr.length; i++)
-                    arr[i] = convert(affinity.get(i));
-
-                if (LOG.isDebugEnabled())
-                    LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" +
-                        (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']');
-
-                return arr;
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Resolve path mode.
-     *
-     * @param path HDFS path.
-     * @return Path mode.
-     */
-    public IgfsMode mode(Path path) {
-        return modeRslvr.resolveMode(convert(path));
-    }
-
-    /**
-     * Convert the given path to path acceptable by the primary file system.
-     *
-     * @param path Path.
-     * @return Primary file system path.
-     */
-    private Path toPrimary(Path path) {
-        return convertPath(path, getUri());
-    }
-
-    /**
-     * Convert the given path to path acceptable by the secondary file system.
-     *
-     * @param path Path.
-     * @return Secondary file system path.
-     */
-    private Path toSecondary(Path path) {
-        assert secondaryFs != null;
-        assert secondaryUri != null;
-
-        return convertPath(path, secondaryUri);
-    }
-
-    /**
-     * Convert path using the given new URI.
-     *
-     * @param path Old path.
-     * @param newUri New URI.
-     * @return New path.
-     */
-    private Path convertPath(Path path, URI newUri) {
-        assert newUri != null;
-
-        if (path != null) {
-            URI pathUri = path.toUri();
-
-            try {
-                return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null,
-                    pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null));
-            }
-            catch (URISyntaxException e) {
-                throw new IgniteException("Failed to construct secondary file system path from the primary file " +
-                    "system path: " + path, e);
-            }
-        }
-        else
-            return null;
-    }
-
-    /**
-     * Convert a file status obtained from the secondary file system to a status of the primary file system.
-     *
-     * @param status Secondary file system status.
-     * @return Primary file system status.
-     */
-    private FileStatus toPrimary(FileStatus status) {
-        return status != null ? new FileStatus(status.getLen(), status.isDirectory(), status.getReplication(),
-            status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(),
-            status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null;
-    }
-
-    /**
-     * Convert IGFS path into Hadoop path.
-     *
-     * @param path IGFS path.
-     * @return Hadoop path.
-     */
-    private Path convert(IgfsPath path) {
-        return new Path(IGFS_SCHEME, uriAuthority, path.toString());
-    }
-
-    /**
-     * Convert Hadoop path into IGFS path.
-     *
-     * @param path Hadoop path.
-     * @return IGFS path.
-     */
-    @Nullable private IgfsPath convert(Path path) {
-        if (path == null)
-            return null;
-
-        return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) :
-            new IgfsPath(workingDir, path.toUri().getPath());
-    }
-
-    /**
-     * Convert IGFS affinity block location into Hadoop affinity block location.
-     *
-     * @param block IGFS affinity block location.
-     * @return Hadoop affinity block location.
-     */
-    private BlockLocation convert(IgfsBlockLocation block) {
-        Collection<String> names = block.names();
-        Collection<String> hosts = block.hosts();
-
-        return new BlockLocation(
-            names.toArray(new String[names.size()]) /* hostname:portNumber of data nodes */,
-            hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes */,
-            block.start(), block.length()
-        ) {
-            @Override public String toString() {
-                try {
-                    return "BlockLocation [offset=" + getOffset() + ", length=" + getLength() +
-                        ", hosts=" + Arrays.asList(getHosts()) + ", names=" + Arrays.asList(getNames()) + ']';
-                }
-                catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        };
-    }
-
-    /**
-     * Convert IGFS file information into Hadoop file status.
-     *
-     * @param file IGFS file information.
-     * @return Hadoop file status.
-     */
-    private FileStatus convert(IgfsFile file) {
-        return new FileStatus(
-            file.length(),
-            file.isDirectory(),
-            dfltReplication,
-            file.groupBlockSize(),
-            file.modificationTime(),
-            file.accessTime(),
-            permission(file),
-            file.property(PROP_USER_NAME, DFLT_USER_NAME),
-            file.property(PROP_GROUP_NAME, "users"),
-            convert(file.path())) {
-            @Override public String toString() {
-                return "FileStatus [path=" + getPath() + ", isDir=" + isDirectory() + ", len=" + getLen() + "]";
-            }
-        };
-    }
-
-    /**
-     * Convert Hadoop permission into IGFS file attribute.
-     *
-     * @param perm Hadoop permission.
-     * @return IGFS attributes.
-     */
-    private Map<String, String> permission(FsPermission perm) {
-        if (perm == null)
-            perm = FsPermission.getDefault();
-
-        return F.asMap(PROP_PERMISSION, toString(perm));
-    }
-
-    /**
-     * @param perm Permission.
-     * @return String.
-     */
-    private static String toString(FsPermission perm) {
-        return String.format("%04o", perm.toShort());
-    }
-
-    /**
-     * Convert IGFS file attributes into Hadoop permission.
-     *
-     * @param file File info.
-     * @return Hadoop permission.
-     */
-    private FsPermission permission(IgfsFile file) {
-        String perm = file.property(PROP_PERMISSION, null);
-
-        if (perm == null)
-            return FsPermission.getDefault();
-
-        try {
-            return new FsPermission((short)Integer.parseInt(perm, 8));
-        }
-        catch (NumberFormatException ignore) {
-            return FsPermission.getDefault();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgfsHadoopFileSystem.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html
deleted file mode 100644
index 6df66f4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<body>
-    <!-- Package description. -->
-    Contains Hadoop 2.x <code>FileSystem</code> wrapper for Ignite file system.
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html b/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html
deleted file mode 100644
index ec38a21..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<body>
-    <!-- Package description. -->
-    Contains <b>IG</b>nite <b>F</b>ile <b>S</b>ystem APIs.
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java
deleted file mode 100644
index 27d6e33..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Facade for communication with grid.
- */
-public interface IgfsHadoop {
-    /**
-     * Perform handshake.
-     *
-     * @param logDir Log directory.
-     * @return Future with handshake result.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException;
-
-    /**
-     * Close connection.
-     *
-     * @param force Force flag.
-     */
-    public void close(boolean force);
-
-    /**
-     * Command to retrieve file info for some IGFS path.
-     *
-     * @param path Path to get file info for.
-     * @return Future for info operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to update file properties.
-     *
-     * @param path IGFS path to update properties.
-     * @param props Properties to update.
-     * @return Future for update operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
-
-    /**
-     * Sets last access time and last modification time for a file.
-     *
-     * @param path Path to update times.
-     * @param accessTime Last access time to set.
-     * @param modificationTime Last modification time to set.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException,
-        IOException;
-
-    /**
-     * Command to rename given path.
-     *
-     * @param src Source path.
-     * @param dest Destination path.
-     * @return Future for rename operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to delete given path.
-     *
-     * @param path Path to delete.
-     * @param recursive {@code True} if deletion is recursive.
-     * @return Future for delete operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to get affinity for given path, offset and length.
-     *
-     * @param path Path to get affinity for.
-     * @param start Start position (offset).
-     * @param len Data length.
-     * @return Future for affinity command.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException,
-        IOException;
-
-    /**
-     * Gets path summary.
-     *
-     * @param path Path to get summary for.
-     * @return Future that will be completed when summary is received.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to create directories.
-     *
-     * @param path Path to create.
-     * @return Future for mkdirs operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to get list of files in directory.
-     *
-     * @param path Path to list.
-     * @return Future for listFiles operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to get directory listing.
-     *
-     * @param path Path to list.
-     * @return Future for listPaths operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Performs status request.
-     *
-     * @return Status response.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsStatus fsStatus() throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to open file for reading.
-     *
-     * @param path File path to open.
-     * @return Future for open operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to open file for reading.
-     *
-     * @param path File path to open.
-     * @return Future for open operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsHadoopStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException,
-        IOException;
-
-    /**
-     * Command to create file and open it for output.
-     *
-     * @param path Path to file.
-     * @param overwrite If {@code true} then old file contents will be lost.
-     * @param colocate If {@code true} and called on data node, file will be written on that node.
-     * @param replication Replication factor.
-     * @param props File properties for creation.
-     * @return Stream descriptor.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
-        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
-
-    /**
-     * Open file for output appending data to the end of a file.
-     *
-     * @param path Path to file.
-     * @param create If {@code true}, file will be created if does not exist.
-     * @param props File properties.
-     * @return Stream descriptor.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create,
-        @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java
deleted file mode 100644
index 03bf733..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.*;
-
-/**
- * Communication exception indicating a problem between file system and IGFS instance.
- */
-public class IgfsHadoopCommunicationException extends IgniteCheckedException {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /**
-     * Creates new exception with given throwable as a nested cause and
-     * source of error message.
-     *
-     * @param cause Non-null throwable cause.
-     */
-    public IgfsHadoopCommunicationException(Exception cause) {
-        super(cause);
-    }
-
-    /**
-     * Creates a new exception with given error message and optional nested cause exception.
-     *
-     * @param msg Error message.
-     */
-    public IgfsHadoopCommunicationException(String msg) {
-        super(msg);
-    }
-
-    /**
-     * Creates a new exception with given error message and optional nested cause exception.
-     *
-     * @param msg Error message.
-     * @param cause Cause.
-     */
-    public IgfsHadoopCommunicationException(String msg, Exception cause) {
-        super(msg, cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java
deleted file mode 100644
index 35638ea..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.ignite.configuration.IgfsConfiguration.*;
-
-/**
- * IGFS endpoint abstraction.
- */
-public class IgfsHadoopEndpoint {
-    /** Localhost. */
-    public static final String LOCALHOST = "127.0.0.1";
-
-    /** IGFS name. */
-    private final String igfsName;
-
-    /** Grid name. */
-    private final String gridName;
-
-    /** Host. */
-    private final String host;
-
-    /** Port. */
-    private final int port;
-
-    /**
-     * Normalize IGFS URI.
-     *
-     * @param uri URI.
-     * @return Normalized URI.
-     * @throws IOException If failed.
-     */
-    public static URI normalize(URI uri) throws IOException {
-        try {
-            if (!F.eq(IgniteFs.IGFS_SCHEME, uri.getScheme()))
-                throw new IOException("Failed to normalize UIR because it has non IGFS scheme: " + uri);
-
-            IgfsHadoopEndpoint endpoint = new IgfsHadoopEndpoint(uri.getAuthority());
-
-            StringBuilder sb = new StringBuilder();
-
-            if (endpoint.igfs() != null)
-                sb.append(endpoint.igfs());
-
-            if (endpoint.grid() != null)
-                sb.append(":").append(endpoint.grid());
-
-            return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : null, endpoint.host(), endpoint.port(),
-                uri.getPath(), uri.getQuery(), uri.getFragment());
-        }
-        catch (URISyntaxException | IgniteCheckedException e) {
-            throw new IOException("Failed to normalize URI: " + uri, e);
-        }
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param connStr Connection string.
-     * @throws IgniteCheckedException If failed to parse connection string.
-     */
-    public IgfsHadoopEndpoint(@Nullable String connStr) throws IgniteCheckedException {
-        if (connStr == null)
-            connStr = "";
-
-        String[] tokens = connStr.split("@", -1);
-
-        IgniteBiTuple<String, Integer> hostPort;
-
-        if (tokens.length == 1) {
-            igfsName = null;
-            gridName = null;
-
-            hostPort = hostPort(connStr, connStr);
-        }
-        else if (tokens.length == 2) {
-            String authStr = tokens[0];
-
-            if (authStr.isEmpty()) {
-                gridName = null;
-                igfsName = null;
-            }
-            else {
-                String[] authTokens = authStr.split(":", -1);
-
-                igfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0];
-
-                if (authTokens.length == 1)
-                    gridName = null;
-                else if (authTokens.length == 2)
-                    gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1];
-                else
-                    throw new IgniteCheckedException("Invalid connection string format: " + connStr);
-            }
-
-            hostPort = hostPort(connStr, tokens[1]);
-        }
-        else
-            throw new IgniteCheckedException("Invalid connection string format: " + connStr);
-
-        host = hostPort.get1();
-
-        assert hostPort.get2() != null;
-
-        port = hostPort.get2();
-    }
-
-    /**
-     * Parse host and port.
-     *
-     * @param connStr Full connection string.
-     * @param hostPortStr Host/port connection string part.
-     * @return Tuple with host and port.
-     * @throws IgniteCheckedException If failed to parse connection string.
-     */
-    private IgniteBiTuple<String, Integer> hostPort(String connStr, String hostPortStr) throws IgniteCheckedException {
-        String[] tokens = hostPortStr.split(":", -1);
-
-        String host = tokens[0];
-
-        if (F.isEmpty(host))
-            host = LOCALHOST;
-
-        int port;
-
-        if (tokens.length == 1)
-            port = DFLT_IPC_PORT;
-        else if (tokens.length == 2) {
-            String portStr = tokens[1];
-
-            try {
-                port = Integer.valueOf(portStr);
-
-                if (port < 0 || port > 65535)
-                    throw new IgniteCheckedException("Invalid port number: " + connStr);
-            }
-            catch (NumberFormatException e) {
-                throw new IgniteCheckedException("Invalid port number: " + connStr);
-            }
-        }
-        else
-            throw new IgniteCheckedException("Invalid connection string format: " + connStr);
-
-        return F.t(host, port);
-    }
-
-    /**
-     * @return IGFS name.
-     */
-    @Nullable public String igfs() {
-        return igfsName;
-    }
-
-    /**
-     * @return Grid name.
-     */
-    @Nullable public String grid() {
-        return gridName;
-    }
-
-    /**
-     * @return Host.
-     */
-    public String host() {
-        return host;
-    }
-
-    /**
-     * @return Host.
-     */
-    public boolean isLocal() {
-        return F.eq(LOCALHOST, host);
-    }
-
-    /**
-     * @return Port.
-     */
-    public int port() {
-        return port;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgfsHadoopEndpoint.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java
deleted file mode 100644
index da86e37..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Extended IGFS server interface.
- */
-public interface IgfsHadoopEx extends IgfsHadoop {
-    /**
-     * Adds event listener that will be invoked when connection with server is lost or remote error has occurred.
-     * If connection is closed already, callback will be invoked synchronously inside this method.
-     *
-     * @param delegate Stream delegate.
-     * @param lsnr Event listener.
-     */
-    public void addEventListener(IgfsHadoopStreamDelegate delegate, IgfsHadoopStreamEventListener lsnr);
-
-    /**
-     * Removes event listener that will be invoked when connection with server is lost or remote error has occurred.
-     *
-     * @param delegate Stream delegate.
-     */
-    public void removeEventListener(IgfsHadoopStreamDelegate delegate);
-
-    /**
-     * Asynchronously reads specified amount of bytes from opened input stream.
-     *
-     * @param delegate Stream delegate.
-     * @param pos Position to read from.
-     * @param len Data length to read.
-     * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining
-     *     bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will
-     *     be the result of read future.
-     * @param outOff Output offset.
-     * @param outLen Output length.
-     * @return Read data.
-     */
-    public GridPlainFuture<byte[]> readData(IgfsHadoopStreamDelegate delegate, long pos, int len,
-        @Nullable final byte[] outBuf, final int outOff, final int outLen);
-
-    /**
-     * Writes data to the stream with given streamId. This method does not return any future since
-     * no response to write request is sent.
-     *
-     * @param delegate Stream delegate.
-     * @param data Data to write.
-     * @param off Offset.
-     * @param len Length.
-     * @throws IOException If failed.
-     */
-    public void writeData(IgfsHadoopStreamDelegate delegate, byte[] data, int off, int len) throws IOException;
-
-    /**
-     * Close server stream.
-     *
-     * @param delegate Stream delegate.
-     * @throws IOException If failed.
-     */
-    public void closeStream(IgfsHadoopStreamDelegate delegate) throws IOException;
-
-    /**
-     * Flush output stream.
-     *
-     * @param delegate Stream delegate.
-     * @throws IOException If failed.
-     */
-    public void flush(IgfsHadoopStreamDelegate delegate) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
deleted file mode 100644
index c9d1322..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.hadoop.fs.permission.*;
-import org.apache.ignite.*;
-
-import java.util.*;
-
-import static org.apache.ignite.IgniteFs.*;
-
-/**
- * Hadoop file system properties.
- */
-public class IgfsHadoopFSProperties {
-    /** Username. */
-    private String usrName;
-
-    /** Group name. */
-    private String grpName;
-
-    /** Permissions. */
-    private FsPermission perm;
-
-    /**
-     * Constructor.
-     *
-     * @param props Properties.
-     * @throws IgniteException In case of error.
-     */
-    public IgfsHadoopFSProperties(Map<String, String> props) throws IgniteException {
-        usrName = props.get(PROP_USER_NAME);
-        grpName = props.get(PROP_GROUP_NAME);
-
-        String permStr = props.get(PROP_PERMISSION);
-
-        if (permStr != null) {
-            try {
-                perm = new FsPermission((short)Integer.parseInt(permStr, 8));
-            }
-            catch (NumberFormatException ignore) {
-                throw new IgniteException("Permissions cannot be parsed: " + permStr);
-            }
-        }
-    }
-
-    /**
-     * Get user name.
-     *
-     * @return User name.
-     */
-    public String userName() {
-        return usrName;
-    }
-
-    /**
-     * Get group name.
-     *
-     * @return Group name.
-     */
-    public String groupName() {
-        return grpName;
-    }
-
-    /**
-     * Get permission.
-     *
-     * @return Permission.
-     */
-    public FsPermission permission() {
-        return perm;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java
deleted file mode 100644
index 476641c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-/**
- * IGFS client future that holds response parse closure.
- */
-public class IgfsHadoopFuture<T> extends GridPlainFutureAdapter<T> {
-    /** Output buffer. */
-    private byte[] outBuf;
-
-    /** Output offset. */
-    private int outOff;
-
-    /** Output length. */
-    private int outLen;
-
-    /** Read future flag. */
-    private boolean read;
-
-    /**
-     * @return Output buffer.
-     */
-    public byte[] outputBuffer() {
-        return outBuf;
-    }
-
-    /**
-     * @param outBuf Output buffer.
-     */
-    public void outputBuffer(@Nullable byte[] outBuf) {
-        this.outBuf = outBuf;
-    }
-
-    /**
-     * @return Offset in output buffer to write from.
-     */
-    public int outputOffset() {
-        return outOff;
-    }
-
-    /**
-     * @param outOff Offset in output buffer to write from.
-     */
-    public void outputOffset(int outOff) {
-        this.outOff = outOff;
-    }
-
-    /**
-     * @return Length to write to output buffer.
-     */
-    public int outputLength() {
-        return outLen;
-    }
-
-    /**
-     * @param outLen Length to write to output buffer.
-     */
-    public void outputLength(int outLen) {
-        this.outLen = outLen;
-    }
-
-    /**
-     * @param read {@code True} if this is a read future.
-     */
-    public void read(boolean read) {
-        this.read = read;
-    }
-
-    /**
-     * @return {@code True} if this is a read future.
-     */
-    public boolean read() {
-        return read;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java
deleted file mode 100644
index 8245125..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.commons.logging.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Communication with grid in the same process.
- */
-public class IgfsHadoopInProc implements IgfsHadoopEx {
-    /** Target IGFS. */
-    private final IgfsEx igfs;
-
-    /** Buffer size. */
-    private final int bufSize;
-
-    /** Event listeners. */
-    private final Map<IgfsHadoopStreamDelegate, IgfsHadoopStreamEventListener> lsnrs =
-        new ConcurrentHashMap<>();
-
-    /** Logger. */
-    private final Log log;
-
-    /**
-     * Constructor.
-     *
-     * @param igfs Target IGFS.
-     * @param log Log.
-     */
-    public IgfsHadoopInProc(IgfsEx igfs, Log log) {
-        this.igfs = igfs;
-        this.log = log;
-
-        bufSize = igfs.configuration().getBlockSize() * 2;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHandshakeResponse handshake(String logDir) {
-        igfs.clientLogDirectory(logDir);
-
-        return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
-            igfs.globalSampling());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close(boolean force) {
-        // Perform cleanup.
-        for (IgfsHadoopStreamEventListener lsnr : lsnrs.values()) {
-            try {
-                lsnr.onClose();
-            }
-            catch (IgniteCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to notify stream event listener", e);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
-        try {
-            return igfs.info(path);
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to get file info because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
-        try {
-            return igfs.update(path, props);
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to update file because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException {
-        try {
-            igfs.setTimes(path, accessTime, modificationTime);
-
-            return true;
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to set path times because Grid is stopping: " +
-                path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
-        try {
-            igfs.rename(src, dest);
-
-            return true;
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to rename path because Grid is stopping: " + src);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
-        try {
-            return igfs.delete(path, recursive);
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to delete path because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
-        try {
-            return igfs.globalSpace();
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to get file system status because Grid is " +
-                "stopping.");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
-        try {
-            return igfs.listPaths(path);
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to list paths because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
-        try {
-            return igfs.listFiles(path);
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to list files because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
-        try {
-            igfs.mkdirs(path, props);
-
-            return true;
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to create directory because Grid is stopping: " +
-                path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
-        try {
-            return igfs.summary(path);
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to get content summary because Grid is stopping: " +
-                path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
-        throws IgniteCheckedException {
-        try {
-            return igfs.affinity(path, start, len);
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to get affinity because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
-        try {
-            IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
-
-            return new IgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length());
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHadoopStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch)
-        throws IgniteCheckedException {
-        try {
-            IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
-
-            return new IgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length());
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
-        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
-        try {
-            IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
-                colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
-
-            return new IgfsHadoopStreamDelegate(this, stream);
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to create file because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create,
-        @Nullable Map<String, String> props) throws IgniteCheckedException {
-        try {
-            IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
-
-            return new IgfsHadoopStreamDelegate(this, stream);
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new IgfsHadoopCommunicationException("Failed to append file because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridPlainFuture<byte[]> readData(IgfsHadoopStreamDelegate delegate, long pos, int len,
-        @Nullable byte[] outBuf, int outOff, int outLen) {
-        IgfsInputStreamAdapter stream = delegate.target();
-
-        try {
-            byte[] res = null;
-
-            if (outBuf != null) {
-                int outTailLen = outBuf.length - outOff;
-
-                if (len <= outTailLen)
-                    stream.readFully(pos, outBuf, outOff, len);
-                else {
-                    stream.readFully(pos, outBuf, outOff, outTailLen);
-
-                    int remainderLen = len - outTailLen;
-
-                    res = new byte[remainderLen];
-
-                    stream.readFully(pos, res, 0, remainderLen);
-                }
-            } else {
-                res = new byte[len];
-
-                stream.readFully(pos, res, 0, len);
-            }
-
-            return new GridPlainFutureAdapter<>(res);
-        }
-        catch (IllegalStateException | IOException e) {
-            IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate);
-
-            if (lsnr != null)
-                lsnr.onError(e.getMessage());
-
-            return new GridPlainFutureAdapter<>(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeData(IgfsHadoopStreamDelegate delegate, byte[] data, int off, int len)
-        throws IOException {
-        try {
-            IgfsOutputStream stream = delegate.target();
-
-            stream.write(data, off, len);
-        }
-        catch (IllegalStateException | IOException e) {
-            IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate);
-
-            if (lsnr != null)
-                lsnr.onError(e.getMessage());
-
-            if (e instanceof IllegalStateException)
-                throw new IOException("Failed to write data to IGFS stream because Grid is stopping.", e);
-            else
-                throw e;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void flush(IgfsHadoopStreamDelegate delegate) throws IOException {
-        try {
-            IgfsOutputStream stream = delegate.target();
-
-            stream.flush();
-        }
-        catch (IllegalStateException | IOException e) {
-            IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate);
-
-            if (lsnr != null)
-                lsnr.onError(e.getMessage());
-
-            if (e instanceof IllegalStateException)
-                throw new IOException("Failed to flush data to IGFS stream because Grid is stopping.", e);
-            else
-                throw e;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void closeStream(IgfsHadoopStreamDelegate desc) throws IOException {
-        Closeable closeable = desc.target();
-
-        try {
-            closeable.close();
-        }
-        catch (IllegalStateException e) {
-            throw new IOException("Failed to close IGFS stream because Grid is stopping.", e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void addEventListener(IgfsHadoopStreamDelegate delegate,
-        IgfsHadoopStreamEventListener lsnr) {
-        IgfsHadoopStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr);
-
-        assert lsnr0 == null || lsnr0 == lsnr;
-
-        if (log.isDebugEnabled())
-            log.debug("Added stream event listener [delegate=" + delegate + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override public void removeEventListener(IgfsHadoopStreamDelegate delegate) {
-        IgfsHadoopStreamEventListener lsnr0 = lsnrs.remove(delegate);
-
-        if (lsnr0 != null && log.isDebugEnabled())
-            log.debug("Removed stream event listener [delegate=" + delegate + ']');
-    }
-}


Mime
View raw message