ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [18/51] [abbrv] [partial] ignite git commit: IGNITE-3916: Created separate module.
Date Mon, 19 Sep 2016 10:27:08 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
deleted file mode 100644
index 68c0dc4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FsConstants;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Utilities for configuring file systems to support the separate working directory per each thread.
- */
-public class HadoopFileSystemsUtils {
-    /** Name of the property for setting working directory on create new local FS instance. */
-    public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
-
-    /**
-     * Setup wrappers of filesystems to support the separate working directory.
-     *
-     * @param cfg Config for setup.
-     */
-    public static void setupFileSystems(Configuration cfg) {
-        cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName());
-        cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
-                HadoopLocalFileSystemV2.class.getName());
-    }
-
-    /**
-     * Gets the property name to disable file system cache.
-     * @param scheme The file system URI scheme.
-     * @return The property name. If scheme is null,
-     * returns "fs.null.impl.disable.cache".
-     */
-    public static String disableFsCachePropertyName(@Nullable String scheme) {
-        return String.format("fs.%s.impl.disable.cache", scheme);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
deleted file mode 100644
index 681cddb..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.jsr166.ConcurrentHashMap8;
-
-/**
- * Maps values by keys.
- * Values are created lazily using {@link ValueFactory}.
- *
- * Despite of the name, does not depend on any Hadoop classes.
- */
-public class HadoopLazyConcurrentMap<K, V extends Closeable> {
-    /** The map storing the actual values. */
-    private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>();
-
-    /** The factory passed in by the client. Will be used for lazy value creation. */
-    private final ValueFactory<K, V> factory;
-
-    /** Lock used to close the objects. */
-    private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
-
-    /** Flag indicating that this map is closed and cleared. */
-    private boolean closed;
-
-    /**
-     * Constructor.
-     * @param factory the factory to create new values lazily.
-     */
-    public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) {
-        this.factory = factory;
-
-        assert getClass().getClassLoader() == Ignite.class.getClassLoader();
-    }
-
-    /**
-     * Gets cached or creates a new value of V.
-     * Never returns null.
-     * @param k the key to associate the value with.
-     * @return the cached or newly created value, never null.
-     * @throws IgniteException on error
-     */
-    public V getOrCreate(K k) {
-        ValueWrapper w = map.get(k);
-
-        if (w == null) {
-            closeLock.readLock().lock();
-
-            try {
-                if (closed)
-                    throw new IllegalStateException("Failed to create value for key [" + k
-                        + "]: the map is already closed.");
-
-                final ValueWrapper wNew = new ValueWrapper(k);
-
-                w = map.putIfAbsent(k, wNew);
-
-                if (w == null) {
-                    wNew.init();
-
-                    w = wNew;
-                }
-            }
-            finally {
-                closeLock.readLock().unlock();
-            }
-        }
-
-        try {
-            V v = w.getValue();
-
-            assert v != null;
-
-            return v;
-        }
-        catch (IgniteCheckedException ie) {
-            throw new IgniteException(ie);
-        }
-    }
-
-    /**
-     * Clears the map and closes all the values.
-     */
-    public void close() throws IgniteCheckedException {
-        closeLock.writeLock().lock();
-
-        try {
-            if (closed)
-                return;
-
-            closed = true;
-
-            Exception err = null;
-
-            Set<K> keySet = map.keySet();
-
-            for (K key : keySet) {
-                V v = null;
-
-                try {
-                    v = map.get(key).getValue();
-                }
-                catch (IgniteCheckedException ignore) {
-                    // No-op.
-                }
-
-                if (v != null) {
-                    try {
-                        v.close();
-                    }
-                    catch (Exception err0) {
-                        if (err == null)
-                            err = err0;
-                    }
-                }
-            }
-
-            map.clear();
-
-            if (err != null)
-                throw new IgniteCheckedException(err);
-        }
-        finally {
-            closeLock.writeLock().unlock();
-        }
-    }
-
-    /**
-     * Helper class that drives the lazy value creation.
-     */
-    private class ValueWrapper {
-        /** Future. */
-        private final GridFutureAdapter<V> fut = new GridFutureAdapter<>();
-
-        /** the key */
-        private final K key;
-
-        /**
-         * Creates new wrapper.
-         */
-        private ValueWrapper(K key) {
-            this.key = key;
-        }
-
-        /**
-         * Initializes the value using the factory.
-         */
-        private void init() {
-            try {
-                final V v0 = factory.createValue(key);
-
-                if (v0 == null)
-                    throw new IgniteException("Failed to create non-null value. [key=" + key + ']');
-
-                fut.onDone(v0);
-            }
-            catch (Throwable e) {
-                fut.onDone(e);
-            }
-        }
-
-        /**
-         * Gets the available value or blocks until the value is initialized.
-         * @return the value, never null.
-         * @throws IgniteCheckedException on error.
-         */
-        V getValue() throws IgniteCheckedException {
-            return fut.get();
-        }
-    }
-
-    /**
-     * Interface representing the factory that creates map values.
-     * @param <K> the type of the key.
-     * @param <V> the type of the value.
-     */
-    public interface ValueFactory <K, V> {
-        /**
-         * Creates the new value. Should never return null.
-         *
-         * @param key the key to create value for
-         * @return the value.
-         * @throws IOException On failure.
-         */
-        public V createValue(K key) throws IOException;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
deleted file mode 100644
index cbb007f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import java.io.File;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Local file system replacement for Hadoop jobs.
- */
-public class HadoopLocalFileSystemV1 extends LocalFileSystem {
-    /**
-     * Creates new local file system.
-     */
-    public HadoopLocalFileSystemV1() {
-        super(new HadoopRawLocalFileSystem());
-    }
-
-    /** {@inheritDoc} */
-    @Override public File pathToFile(Path path) {
-        return ((HadoopRawLocalFileSystem)getRaw()).convert(path);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
deleted file mode 100644
index 2484492..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumFs;
-import org.apache.hadoop.fs.DelegateToFileSystem;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.local.LocalConfigKeys;
-
-import static org.apache.hadoop.fs.FsConstants.LOCAL_FS_URI;
-
-/**
- * Local file system replacement for Hadoop jobs.
- */
-public class HadoopLocalFileSystemV2 extends ChecksumFs {
-    /**
-     * Creates new local file system.
-     *
-     * @param cfg Configuration.
-     * @throws IOException If failed.
-     * @throws URISyntaxException If failed.
-     */
-    public HadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException {
-        super(new DelegateFS(cfg));
-    }
-
-    /**
-     * Creates new local file system.
-     *
-     * @param uri URI.
-     * @param cfg Configuration.
-     * @throws IOException If failed.
-     * @throws URISyntaxException If failed.
-     */
-    public HadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException {
-        this(cfg);
-    }
-
-    /**
-     * Delegate file system.
-     */
-    private static class DelegateFS extends DelegateToFileSystem {
-        /**
-         * Creates new local file system.
-         *
-         * @param cfg Configuration.
-         * @throws IOException If failed.
-         * @throws URISyntaxException If failed.
-         */
-        public DelegateFS(Configuration cfg) throws IOException, URISyntaxException {
-            super(LOCAL_FS_URI, new HadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int getUriDefaultPort() {
-            return -1;
-        }
-
-        /** {@inheritDoc} */
-        @Override public FsServerDefaults getServerDefaults() throws IOException {
-            return LocalConfigKeys.getServerDefaults();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isValidName(String src) {
-            return true;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
deleted file mode 100644
index 0aac4a3..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-/**
- * This class lists parameters that can be specified in Hadoop configuration.
- * Hadoop configuration can be specified in {@code core-site.xml} file
- * or passed to map-reduce task directly when using Hadoop driver for IGFS file system:
- * <ul>
- *     <li>
- *         {@code fs.igfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides
- *         the one specified in {@link org.apache.ignite.configuration.FileSystemConfiguration#getSequentialReadsBeforePrefetch()}
- *         IGFS data node configuration property.
- *     </li>
- *     <li>
- *         {@code fs.igfs.[name].log.enabled} - specifies whether IGFS sampling logger is enabled. If
- *         {@code true}, then all file system operations will be logged to a file.
- *     </li>
- *     <li>{@code fs.igfs.[name].log.dir} - specifies log directory where sampling log files should be placed.</li>
- *     <li>
- *         {@code fs.igfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before
- *         it gets flushed to log file. Higher values will imply greater performance, but will increase delay
- *         before record appears in the log file.
- *     </li>
- *     <li>
- *         {@code fs.igfs.[name].colocated.writes} - specifies whether written files should be colocated on data
- *         node to which client is connected. If {@code true}, file will not be distributed and will be written
- *         to a single data node. Default value is {@code true}.
- *     </li>
- *     <li>
- *         {@code fs.igfs.prefer.local.writes} - specifies whether file preferably should be written to
- *         local data node if it has enough free space. After some time it can be redistributed across nodes though.
- *     </li>
- * </ul>
- * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in
- * case your file system URI is {@code igfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}.
- * <p>
- * Sample configuration that can be placed to {@code core-site.xml} file:
- * <pre name="code" class="xml">
- *     &lt;property&gt;
- *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.enabled&lt;/name&gt;
- *         &lt;value&gt;true&lt;/value&gt;
- *     &lt;/property&gt;
- *     &lt;property&gt;
- *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.dir&lt;/name&gt;
- *         &lt;value&gt;/home/apache/ignite/log/sampling&lt;/value&gt;
- *     &lt;/property&gt;
- *     &lt;property&gt;
- *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.batch_size&lt;/name&gt;
- *         &lt;value&gt;16&lt;/value&gt;
- *     &lt;/property&gt;
- * </pre>
- * Parameters could also be specified per mapreduce job, e.g.
- * <pre name="code" class="bash">
- * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.igfs.open.sequential_reads_before_prefetch=4
- * </pre>
- * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest
- * way to do that is {@code String.format(PARAM_IGFS_COLOCATED_WRITES, [name])}.
- */
-public class HadoopParameters {
-    /** Parameter name for control over file colocation write mode. */
-    public static final String PARAM_IGFS_COLOCATED_WRITES = "fs.igfs.%s.colocated.writes";
-
-    /** Parameter name for custom sequential reads before prefetch value. */
-    public static final String PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH =
-        "fs.igfs.%s.open.sequential_reads_before_prefetch";
-
-    /** Parameter name for client logger directory. */
-    public static final String PARAM_IGFS_LOG_DIR = "fs.igfs.%s.log.dir";
-
-    /** Parameter name for log batch size. */
-    public static final String PARAM_IGFS_LOG_BATCH_SIZE = "fs.igfs.%s.log.batch_size";
-
-    /** Parameter name for log enabled flag. */
-    public static final String PARAM_IGFS_LOG_ENABLED = "fs.igfs.%s.log.enabled";
-
-    /** Parameter name for prefer local writes flag. */
-    public static final String PARAM_IGFS_PREFER_LOCAL_WRITES = "fs.igfs.prefer.local.writes";
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
deleted file mode 100644
index b8fc8e7..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.RandomAccessFile;
-import java.net.URI;
-import java.nio.file.Files;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsConstants;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Progressable;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Local file system implementation for Hadoop.
- */
-public class HadoopRawLocalFileSystem extends FileSystem {
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() {
-        @Override protected Path initialValue() {
-            return getInitialWorkingDirectory();
-        }
-    };
-
-    /**
-     * Converts Hadoop path to local path.
-     *
-     * @param path Hadoop path.
-     * @return Local path.
-     */
-    File convert(Path path) {
-        checkPath(path);
-
-        if (path.isAbsolute())
-            return new File(path.toUri().getPath());
-
-        return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getHomeDirectory() {
-        return makeQualified(new Path(System.getProperty("user.home")));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getInitialWorkingDirectory() {
-        File f = new File(System.getProperty("user.dir"));
-
-        return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void initialize(URI uri, Configuration conf) throws IOException {
-        super.initialize(uri, conf);
-
-        setConf(conf);
-
-        String initWorkDir = conf.get(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP);
-
-        if (initWorkDir != null)
-            setWorkingDirectory(new Path(initWorkDir));
-    }
-
-    /** {@inheritDoc} */
-    @Override public URI getUri() {
-        return FsConstants.LOCAL_FS_URI;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-        return new FSDataInputStream(new InStream(checkExists(convert(f))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize,
-        short replication, long blockSize, Progressable progress) throws IOException {
-        File file = convert(f);
-
-        if (!overwrite && !file.createNewFile())
-            throw new IOException("Failed to create new file: " + f.toUri());
-
-        return out(file, false, bufSize);
-    }
-
-    /**
-     * @param file File.
-     * @param append Append flag.
-     * @return Output stream.
-     * @throws IOException If failed.
-     */
-    private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException {
-        return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append),
-            bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme()));
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException {
-        return out(convert(f), true, bufSize);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean rename(Path src, Path dst) throws IOException {
-        return convert(src).renameTo(convert(dst));
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean delete(Path f, boolean recursive) throws IOException {
-        File file = convert(f);
-
-        if (file.isDirectory() && !recursive)
-            throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri());
-
-        return U.delete(file);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setWorkingDirectory(Path dir) {
-        workDir.set(fixRelativePart(dir));
-
-        checkPath(dir);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getWorkingDirectory() {
-        return workDir.get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-        if(f == null)
-            throw new IllegalArgumentException("mkdirs path arg is null");
-
-        Path parent = f.getParent();
-
-        File p2f = convert(f);
-
-        if(parent != null) {
-            File parent2f = convert(parent);
-
-            if(parent2f != null && parent2f.exists() && !parent2f.isDirectory())
-                throw new FileAlreadyExistsException("Parent path is not a directory: " + parent);
-
-        }
-
-        return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus getFileStatus(Path f) throws IOException {
-        return fileStatus(checkExists(convert(f)));
-    }
-
-    /**
-     * @return File status.
-     */
-    private FileStatus fileStatus(File file) throws IOException {
-        boolean dir = file.isDirectory();
-
-        java.nio.file.Path path = dir ? null : file.toPath();
-
-        return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(),
-            /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ?
-            new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI()));
-    }
-
-    /**
-     * @param file File.
-     * @return Same file.
-     * @throws FileNotFoundException If does not exist.
-     */
-    private static File checkExists(File file) throws FileNotFoundException {
-        if (!file.exists())
-            throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist.");
-
-        return file;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus[] listStatus(Path f) throws IOException {
-        File file = convert(f);
-
-        if (checkExists(file).isFile())
-            return new FileStatus[] {fileStatus(file)};
-
-        File[] files = file.listFiles();
-
-        FileStatus[] res = new FileStatus[files.length];
-
-        for (int i = 0; i < res.length; i++)
-            res[i] = fileStatus(files[i]);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean supportsSymlinks() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException {
-        Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath());
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus getFileLinkStatus(Path f) throws IOException {
-        return getFileStatus(getLinkTarget(f));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getLinkTarget(Path f) throws IOException {
-        File file = Files.readSymbolicLink(convert(f).toPath()).toFile();
-
-        return new Path(file.toURI());
-    }
-
-    /**
-     * Input stream.
-     */
-    private static class InStream extends InputStream implements Seekable, PositionedReadable {
-        /** */
-        private final RandomAccessFile file;
-
-        /**
-         * @param f File.
-         * @throws IOException If failed.
-         */
-        public InStream(File f) throws IOException {
-            file = new RandomAccessFile(f, "r");
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized int read() throws IOException {
-            return file.read();
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
-            return file.read(b, off, len);
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized void close() throws IOException {
-            file.close();
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
-            long pos0 = file.getFilePointer();
-
-            file.seek(pos);
-            int res = file.read(buf, off, len);
-
-            file.seek(pos0);
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
-            if (read(pos, buf, off, len) != len)
-                throw new IOException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readFully(long pos, byte[] buf) throws IOException {
-            readFully(pos, buf, 0, buf.length);
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized void seek(long pos) throws IOException {
-            file.seek(pos);
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized long getPos() throws IOException {
-            return file.getFilePointer();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean seekToNewSource(long targetPos) throws IOException {
-            return false;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java
deleted file mode 100644
index fe43596..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathSummary;
-import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
-import org.apache.ignite.internal.processors.igfs.IgfsStatus;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Facade for communication with grid.
- */
-public interface HadoopIgfs {
-    /**
-     * Perform handshake.
-     *
-     * @param logDir Log directory.
-     * @return Future with handshake result.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException;
-
-    /**
-     * Close connection.
-     *
-     * @param force Force flag.
-     */
-    public void close(boolean force);
-
-    /**
-     * Command to retrieve file info for some IGFS path.
-     *
-     * @param path Path to get file info for.
-     * @return Future for info operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to update file properties.
-     *
-     * @param path IGFS path to update properties.
-     * @param props Properties to update.
-     * @return Future for update operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
-
-    /**
-     * Sets last access time and last modification time for a file.
-     *
-     * @param path Path to update times.
-     * @param accessTime Last access time to set.
-     * @param modificationTime Last modification time to set.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException,
-        IOException;
-
-    /**
-     * Command to rename given path.
-     *
-     * @param src Source path.
-     * @param dest Destination path.
-     * @return Future for rename operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to delete given path.
-     *
-     * @param path Path to delete.
-     * @param recursive {@code True} if deletion is recursive.
-     * @return Future for delete operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to get affinity for given path, offset and length.
-     *
-     * @param path Path to get affinity for.
-     * @param start Start position (offset).
-     * @param len Data length.
-     * @return Future for affinity command.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException,
-        IOException;
-
-    /**
-     * Gets path summary.
-     *
-     * @param path Path to get summary for.
-     * @return Future that will be completed when summary is received.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to create directories.
-     *
-     * @param path Path to create.
-     * @return Future for mkdirs operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to get list of files in directory.
-     *
-     * @param path Path to list.
-     * @return Future for listFiles operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to get directory listing.
-     *
-     * @param path Path to list.
-     * @return Future for listPaths operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Performs status request.
-     *
-     * @return Status response.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsStatus fsStatus() throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to open file for reading.
-     *
-     * @param path File path to open.
-     * @return Future for open operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to open file for reading.
-     *
-     * @param path File path to open.
-     * @return Future for open operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException,
-        IOException;
-
-    /**
-     * Command to create file and open it for output.
-     *
-     * @param path Path to file.
-     * @param overwrite If {@code true} then old file contents will be lost.
-     * @param colocate If {@code true} and called on data node, file will be written on that node.
-     * @param replication Replication factor.
-     * @param props File properties for creation.
-     * @return Stream descriptor.
-     * @throws IgniteCheckedException If failed.
-     */
-    public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
-        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
-
-    /**
-     * Open file for output appending data to the end of a file.
-     *
-     * @param path Path to file.
-     * @param create If {@code true}, file will be created if does not exist.
-     * @param props File properties.
-     * @return Stream descriptor.
-     * @throws IgniteCheckedException If failed.
-     */
-    public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
-        @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java
deleted file mode 100644
index d610091..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import org.apache.ignite.IgniteCheckedException;
-
-/**
- * Communication exception indicating a problem between file system and IGFS instance.
- */
-public class HadoopIgfsCommunicationException extends IgniteCheckedException {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /**
-     * Creates new exception with given throwable as a nested cause and
-     * source of error message.
-     *
-     * @param cause Non-null throwable cause.
-     */
-    public HadoopIgfsCommunicationException(Exception cause) {
-        super(cause);
-    }
-
-    /**
-     * Creates a new exception with given error message and optional nested cause exception.
-     *
-     * @param msg Error message.
-     */
-    public HadoopIgfsCommunicationException(String msg) {
-        super(msg);
-    }
-
-    /**
-     * Creates a new exception with given error message and optional nested cause exception.
-     *
-     * @param msg Error message.
-     * @param cause Cause.
-     */
-    public HadoopIgfsCommunicationException(String msg, Exception cause) {
-        super(msg, cause);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
deleted file mode 100644
index 014e2a1..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Extended IGFS server interface.
- */
-public interface HadoopIgfsEx extends HadoopIgfs {
-    /**
-     * Adds event listener that will be invoked when connection with server is lost or remote error has occurred.
-     * If connection is closed already, callback will be invoked synchronously inside this method.
-     *
-     * @param delegate Stream delegate.
-     * @param lsnr Event listener.
-     */
-    public void addEventListener(HadoopIgfsStreamDelegate delegate, HadoopIgfsStreamEventListener lsnr);
-
-    /**
-     * Removes event listener that will be invoked when connection with server is lost or remote error has occurred.
-     *
-     * @param delegate Stream delegate.
-     */
-    public void removeEventListener(HadoopIgfsStreamDelegate delegate);
-
-    /**
-     * Asynchronously reads specified amount of bytes from opened input stream.
-     *
-     * @param delegate Stream delegate.
-     * @param pos Position to read from.
-     * @param len Data length to read.
-     * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining
-     *     bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will
-     *     be the result of read future.
-     * @param outOff Output offset.
-     * @param outLen Output length.
-     * @return Read data.
-     */
-    public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len,
-        @Nullable final byte[] outBuf, final int outOff, final int outLen);
-
-    /**
-     * Writes data to the stream with given streamId. This method does not return any future since
-     * no response to write request is sent.
-     *
-     * @param delegate Stream delegate.
-     * @param data Data to write.
-     * @param off Offset.
-     * @param len Length.
-     * @throws IOException If failed.
-     */
-    public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) throws IOException;
-
-    /**
-     * Close server stream.
-     *
-     * @param delegate Stream delegate.
-     * @throws IOException If failed.
-     */
-    public void closeStream(HadoopIgfsStreamDelegate delegate) throws IOException;
-
-    /**
-     * Flush output stream.
-     *
-     * @param delegate Stream delegate.
-     * @throws IOException If failed.
-     */
-    public void flush(HadoopIgfsStreamDelegate delegate) throws IOException;
-
-    /**
-     * The user this Igfs instance works on behalf of.
-     * @return the user name.
-     */
-    public String user();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java
deleted file mode 100644
index 5ff1b2e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * IGFS client future that holds response parse closure.
- */
-public class HadoopIgfsFuture<T> extends GridFutureAdapter<T> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Output buffer. */
-    private byte[] outBuf;
-
-    /** Output offset. */
-    private int outOff;
-
-    /** Output length. */
-    private int outLen;
-
-    /** Read future flag. */
-    private boolean read;
-
-    /**
-     * @return Output buffer.
-     */
-    public byte[] outputBuffer() {
-        return outBuf;
-    }
-
-    /**
-     * @param outBuf Output buffer.
-     */
-    public void outputBuffer(@Nullable byte[] outBuf) {
-        this.outBuf = outBuf;
-    }
-
-    /**
-     * @return Offset in output buffer to write from.
-     */
-    public int outputOffset() {
-        return outOff;
-    }
-
-    /**
-     * @param outOff Offset in output buffer to write from.
-     */
-    public void outputOffset(int outOff) {
-        this.outOff = outOff;
-    }
-
-    /**
-     * @return Length to write to output buffer.
-     */
-    public int outputLength() {
-        return outLen;
-    }
-
-    /**
-     * @param outLen Length to write to output buffer.
-     */
-    public void outputLength(int outLen) {
-        this.outLen = outLen;
-    }
-
-    /**
-     * @param read {@code True} if this is a read future.
-     */
-    public void read(boolean read) {
-        this.read = read;
-    }
-
-    /**
-     * @return {@code True} if this is a read future.
-     */
-    public boolean read() {
-        return read;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
deleted file mode 100644
index 3220538..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.logging.Log;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsInputStream;
-import org.apache.ignite.igfs.IgfsOutputStream;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathSummary;
-import org.apache.ignite.igfs.IgfsUserContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
-import org.apache.ignite.internal.processors.igfs.IgfsStatus;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.lang.IgniteOutClosure;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Communication with grid in the same process.
- */
-public class HadoopIgfsInProc implements HadoopIgfsEx {
-    /** Target IGFS. */
-    private final IgfsEx igfs;
-
-    /** Buffer size. */
-    private final int bufSize;
-
-    /** Event listeners. */
-    private final Map<HadoopIgfsStreamDelegate, HadoopIgfsStreamEventListener> lsnrs =
-        new ConcurrentHashMap<>();
-
-    /** Logger. */
-    private final Log log;
-
-    /** The user this Igfs works on behalf of. */
-    private final String user;
-
-    /**
-     * Constructor.
-     *
-     * @param igfs Target IGFS.
-     * @param log Log.
-     */
-    public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException {
-        this.user = IgfsUtils.fixUserName(userName);
-
-        this.igfs = igfs;
-
-        this.log = log;
-
-        bufSize = igfs.configuration().getBlockSize() * 2;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHandshakeResponse handshake(final String logDir) {
-        return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsHandshakeResponse>() {
-            @Override public IgfsHandshakeResponse apply() {
-                igfs.clientLogDirectory(logDir);
-
-                return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
-                    igfs.globalSampling());
-                }
-         });
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close(boolean force) {
-        // Perform cleanup.
-        for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) {
-            try {
-                lsnr.onClose();
-            }
-            catch (IgniteCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to notify stream event listener", e);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException {
-        try {
-            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
-                @Override public IgfsFile apply() {
-                    return igfs.info(path);
-                }
-            });
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to get file info because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
-        try {
-            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
-                @Override public IgfsFile apply() {
-                    return igfs.update(path, props);
-                }
-            });
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to update file because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException {
-        try {
-            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
-                @Override public Void apply() {
-                    igfs.setTimes(path, accessTime, modificationTime);
-
-                    return null;
-                }
-            });
-
-            return true;
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to set path times because Grid is stopping: " +
-                path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException {
-        try {
-            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
-                @Override public Void apply() {
-                    igfs.rename(src, dest);
-
-                    return null;
-                }
-            });
-
-            return true;
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to rename path because Grid is stopping: " + src);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException {
-        try {
-            return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() {
-                @Override public Boolean apply() {
-                    return igfs.delete(path, recursive);
-                }
-            });
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to delete path because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
-        try {
-            return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() {
-                @Override public IgfsStatus call() throws IgniteCheckedException {
-                    return igfs.globalSpace();
-                }
-            });
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " +
-                "stopping.");
-        }
-        catch (IgniteCheckedException | RuntimeException | Error e) {
-            throw e;
-        }
-        catch (Exception e) {
-            throw new AssertionError("Must never go there.");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IgniteCheckedException {
-        try {
-            return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsPath>>() {
-                @Override public Collection<IgfsPath> apply() {
-                    return igfs.listPaths(path);
-                }
-            });
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to list paths because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IgniteCheckedException {
-        try {
-            return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsFile>>() {
-                @Override public Collection<IgfsFile> apply() {
-                    return igfs.listFiles(path);
-                }
-            });
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to list files because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
-        try {
-            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
-                @Override public Void apply() {
-                    igfs.mkdirs(path, props);
-
-                    return null;
-                }
-            });
-
-            return true;
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to create directory because Grid is stopping: " +
-                path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException {
-        try {
-            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsPathSummary>() {
-                @Override public IgfsPathSummary apply() {
-                    return igfs.summary(path);
-                }
-            });
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to get content summary because Grid is stopping: " +
-                path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, final long len)
-        throws IgniteCheckedException {
-        try {
-            return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsBlockLocation>>() {
-                @Override public Collection<IgfsBlockLocation> apply() {
-                    return igfs.affinity(path, start, len);
-                }
-            });
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to get affinity because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException {
-        try {
-            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
-                @Override public HadoopIgfsStreamDelegate apply() {
-                    IgfsInputStream stream = igfs.open(path, bufSize);
-
-                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length());
-                }
-            });
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
-        throws IgniteCheckedException {
-        try {
-            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
-                @Override public HadoopIgfsStreamDelegate apply() {
-                    IgfsInputStream stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
-
-                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length());
-                }
-            });
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate,
-        final int replication, final long blockSize, final @Nullable Map<String, String> props) throws IgniteCheckedException {
-        try {
-            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
-                @Override public HadoopIgfsStreamDelegate apply() {
-                    IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
-                        colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
-
-                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
-                }
-            });
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to create file because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
-        final @Nullable Map<String, String> props) throws IgniteCheckedException {
-        try {
-            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
-                @Override public HadoopIgfsStreamDelegate apply() {
-                    IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
-
-                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
-                }
-            });
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (IllegalStateException e) {
-            throw new HadoopIgfsCommunicationException("Failed to append file because Grid is stopping: " + path);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len,
-        @Nullable byte[] outBuf, int outOff, int outLen) {
-        IgfsInputStream stream = delegate.target();
-
-        try {
-            byte[] res = null;
-
-            if (outBuf != null) {
-                int outTailLen = outBuf.length - outOff;
-
-                if (len <= outTailLen)
-                    stream.readFully(pos, outBuf, outOff, len);
-                else {
-                    stream.readFully(pos, outBuf, outOff, outTailLen);
-
-                    int remainderLen = len - outTailLen;
-
-                    res = new byte[remainderLen];
-
-                    stream.readFully(pos, res, 0, remainderLen);
-                }
-            } else {
-                res = new byte[len];
-
-                stream.readFully(pos, res, 0, len);
-            }
-
-            return new GridFinishedFuture<>(res);
-        }
-        catch (IllegalStateException | IOException e) {
-            HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
-
-            if (lsnr != null)
-                lsnr.onError(e.getMessage());
-
-            return new GridFinishedFuture<>(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len)
-        throws IOException {
-        try {
-            IgfsOutputStream stream = delegate.target();
-
-            stream.write(data, off, len);
-        }
-        catch (IllegalStateException | IOException e) {
-            HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
-
-            if (lsnr != null)
-                lsnr.onError(e.getMessage());
-
-            if (e instanceof IllegalStateException)
-                throw new IOException("Failed to write data to IGFS stream because Grid is stopping.", e);
-            else
-                throw e;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException {
-        try {
-            IgfsOutputStream stream = delegate.target();
-
-            stream.flush();
-        }
-        catch (IllegalStateException | IOException e) {
-            HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
-
-            if (lsnr != null)
-                lsnr.onError(e.getMessage());
-
-            if (e instanceof IllegalStateException)
-                throw new IOException("Failed to flush data to IGFS stream because Grid is stopping.", e);
-            else
-                throw e;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException {
-        Closeable closeable = desc.target();
-
-        try {
-            closeable.close();
-        }
-        catch (IllegalStateException e) {
-            throw new IOException("Failed to close IGFS stream because Grid is stopping.", e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void addEventListener(HadoopIgfsStreamDelegate delegate,
-        HadoopIgfsStreamEventListener lsnr) {
-        HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr);
-
-        assert lsnr0 == null || lsnr0 == lsnr;
-
-        if (log.isDebugEnabled())
-            log.debug("Added stream event listener [delegate=" + delegate + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override public void removeEventListener(HadoopIgfsStreamDelegate delegate) {
-        HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(delegate);
-
-        if (lsnr0 != null && log.isDebugEnabled())
-            log.debug("Removed stream event listener [delegate=" + delegate + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override public String user() {
-        return user;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java
deleted file mode 100644
index 46b46d7..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java
+++ /dev/null
@@ -1,629 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.igfs.common.IgfsLogger;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * IGFS input stream wrapper for hadoop interfaces.
- */
-@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-public final class HadoopIgfsInputStream extends InputStream implements Seekable, PositionedReadable,
-    HadoopIgfsStreamEventListener {
-    /** Minimum buffer size. */
-    private static final int MIN_BUF_SIZE = 4 * 1024;
-
-    /** Server stream delegate. */
-    private HadoopIgfsStreamDelegate delegate;
-
-    /** Stream ID used by logger. */
-    private long logStreamId;
-
-    /** Stream position. */
-    private long pos;
-
-    /** Stream read limit. */
-    private long limit;
-
-    /** Mark position. */
-    private long markPos = -1;
-
-    /** Prefetch buffer. */
-    private DoubleFetchBuffer buf = new DoubleFetchBuffer();
-
-    /** Buffer half size for double-buffering. */
-    private int bufHalfSize;
-
-    /** Closed flag. */
-    private volatile boolean closed;
-
-    /** Flag set if stream was closed due to connection breakage. */
-    private boolean connBroken;
-
-    /** Logger. */
-    private Log log;
-
-    /** Client logger. */
-    private IgfsLogger clientLog;
-
-    /** Read time. */
-    private long readTime;
-
-    /** User time. */
-    private long userTime;
-
-    /** Last timestamp. */
-    private long lastTs;
-
-    /** Amount of read bytes. */
-    private long total;
-
-    /**
-     * Creates input stream.
-     *
-     * @param delegate Server stream delegate.
-     * @param limit Read limit.
-     * @param bufSize Buffer size.
-     * @param log Log.
-     * @param clientLog Client logger.
-     */
-    public HadoopIgfsInputStream(HadoopIgfsStreamDelegate delegate, long limit, int bufSize, Log log,
-        IgfsLogger clientLog, long logStreamId) {
-        assert limit >= 0;
-
-        this.delegate = delegate;
-        this.limit = limit;
-        this.log = log;
-        this.clientLog = clientLog;
-        this.logStreamId = logStreamId;
-
-        bufHalfSize = Math.max(bufSize, MIN_BUF_SIZE);
-
-        lastTs = System.nanoTime();
-
-        delegate.hadoop().addEventListener(delegate, this);
-    }
-
-    /**
-     * Read start.
-     */
-    private void readStart() {
-        long now = System.nanoTime();
-
-        userTime += now - lastTs;
-
-        lastTs = now;
-    }
-
-    /**
-     * Read end.
-     */
-    private void readEnd() {
-        long now = System.nanoTime();
-
-        readTime += now - lastTs;
-
-        lastTs = now;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int read() throws IOException {
-        checkClosed();
-
-        readStart();
-
-        try {
-            if (eof())
-                return -1;
-
-            buf.refreshAhead(pos);
-
-            int res = buf.atPosition(pos);
-
-            pos++;
-            total++;
-
-            buf.refreshAhead(pos);
-
-            return res;
-        }
-        catch (IgniteCheckedException e) {
-            throw HadoopIgfsUtils.cast(e);
-        }
-        finally {
-            readEnd();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException {
-        checkClosed();
-
-        if (eof())
-            return -1;
-
-        readStart();
-
-        try {
-            long remaining = limit - pos;
-
-            int read = buf.flatten(b, pos, off, len);
-
-            pos += read;
-            total += read;
-            remaining -= read;
-
-            if (remaining > 0 && read != len) {
-                int readAmt = (int)Math.min(remaining, len - read);
-
-                delegate.hadoop().readData(delegate, pos, readAmt, b, off + read, len - read).get();
-
-                read += readAmt;
-                pos += readAmt;
-                total += readAmt;
-            }
-
-            buf.refreshAhead(pos);
-
-            return read;
-        }
-        catch (IgniteCheckedException e) {
-            throw HadoopIgfsUtils.cast(e);
-        }
-        finally {
-            readEnd();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized long skip(long n) throws IOException {
-        checkClosed();
-
-        if (clientLog.isLogEnabled())
-            clientLog.logSkip(logStreamId, n);
-
-        long oldPos = pos;
-
-        if (pos + n <= limit)
-            pos += n;
-        else
-            pos = limit;
-
-        buf.refreshAhead(pos);
-
-        return pos - oldPos;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int available() throws IOException {
-        checkClosed();
-
-        int available = buf.available(pos);
-
-        assert available >= 0;
-
-        return available;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void close() throws IOException {
-        if (!closed) {
-            readStart();
-
-            if (log.isDebugEnabled())
-                log.debug("Closing input stream: " + delegate);
-
-            delegate.hadoop().closeStream(delegate);
-
-            readEnd();
-
-            if (clientLog.isLogEnabled())
-                clientLog.logCloseIn(logStreamId, userTime, readTime, total);
-
-            markClosed(false);
-
-            if (log.isDebugEnabled())
-                log.debug("Closed stream [delegate=" + delegate + ", readTime=" + readTime +
-                    ", userTime=" + userTime + ']');
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void mark(int readLimit) {
-        markPos = pos;
-
-        if (clientLog.isLogEnabled())
-            clientLog.logMark(logStreamId, readLimit);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void reset() throws IOException {
-        checkClosed();
-
-        if (clientLog.isLogEnabled())
-            clientLog.logReset(logStreamId);
-
-        if (markPos == -1)
-            throw new IOException("Stream was not marked.");
-
-        pos = markPos;
-
-        buf.refreshAhead(pos);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean markSupported() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int read(long position, byte[] buf, int off, int len) throws IOException {
-        long remaining = limit - position;
-
-        int read = (int)Math.min(len, remaining);
-
-        // Return -1 at EOF.
-        if (read == 0)
-            return -1;
-
-        readFully(position, buf, off, read);
-
-        return read;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void readFully(long position, byte[] buf, int off, int len) throws IOException {
-        long remaining = limit - position;
-
-        checkClosed();
-
-        if (len > remaining)
-            throw new EOFException("End of stream reached before data was fully read.");
-
-        readStart();
-
-        try {
-            int read = this.buf.flatten(buf, position, off, len);
-
-            total += read;
-
-            if (read != len) {
-                int readAmt = len - read;
-
-                delegate.hadoop().readData(delegate, position + read, readAmt, buf, off + read, readAmt).get();
-
-                total += readAmt;
-            }
-
-            if (clientLog.isLogEnabled())
-                clientLog.logRandomRead(logStreamId, position, len);
-        }
-        catch (IgniteCheckedException e) {
-            throw HadoopIgfsUtils.cast(e);
-        }
-        finally {
-            readEnd();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readFully(long position, byte[] buf) throws IOException {
-        readFully(position, buf, 0, buf.length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void seek(long pos) throws IOException {
-        A.ensure(pos >= 0, "position must be non-negative");
-
-        checkClosed();
-
-        if (clientLog.isLogEnabled())
-            clientLog.logSeek(logStreamId, pos);
-
-        if (pos > limit)
-            pos = limit;
-
-        if (log.isDebugEnabled())
-            log.debug("Seek to position [delegate=" + delegate + ", pos=" + pos + ", oldPos=" + this.pos + ']');
-
-        this.pos = pos;
-
-        buf.refreshAhead(pos);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized long getPos() {
-        return pos;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized boolean seekToNewSource(long targetPos) {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onClose() {
-        markClosed(true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onError(String errMsg) {
-        // No-op.
-    }
-
-    /**
-     * Marks stream as closed.
-     *
-     * @param connBroken {@code True} if connection with server was lost.
-     */
-    private void markClosed(boolean connBroken) {
-        // It is ok to have race here.
-        if (!closed) {
-            closed = true;
-
-            this.connBroken = connBroken;
-
-            delegate.hadoop().removeEventListener(delegate);
-        }
-    }
-
-    /**
-     * @throws IOException If check failed.
-     */
-    private void checkClosed() throws IOException {
-        if (closed) {
-            if (connBroken)
-                throw new IOException("Server connection was lost.");
-            else
-                throw new IOException("Stream is closed.");
-        }
-    }
-
-    /**
-     * @return {@code True} if end of stream reached.
-     */
-    private boolean eof() {
-        return limit == pos;
-    }
-
-    /**
-     * Asynchronous prefetch buffer.
-     */
-    private static class FetchBufferPart {
-        /** Read future. */
-        private IgniteInternalFuture<byte[]> readFut;
-
-        /** Position of cached chunk in file. */
-        private long pos;
-
-        /** Prefetch length. Need to store as read future result might be not available yet. */
-        private int len;
-
-        /**
-         * Creates fetch buffer part.
-         *
-         * @param readFut Read future for this buffer.
-         * @param pos Read position.
-         * @param len Chunk length.
-         */
-        private FetchBufferPart(IgniteInternalFuture<byte[]> readFut, long pos, int len) {
-            this.readFut = readFut;
-            this.pos = pos;
-            this.len = len;
-        }
-
-        /**
-         * Copies cached data if specified position matches cached region.
-         *
-         * @param dst Destination buffer.
-         * @param pos Read position in file.
-         * @param dstOff Offset in destination buffer from which start writing.
-         * @param len Maximum number of bytes to copy.
-         * @return Number of bytes copied.
-         * @throws IgniteCheckedException If read future failed.
-         */
-        public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException {
-            // If read start position is within cached boundaries.
-            if (contains(pos)) {
-                byte[] data = readFut.get();
-
-                int srcPos = (int)(pos - this.pos);
-                int cpLen = Math.min(len, data.length - srcPos);
-
-                U.arrayCopy(data, srcPos, dst, dstOff, cpLen);
-
-                return cpLen;
-            }
-
-            return 0;
-        }
-
-        /**
-         * @return {@code True} if data is ready to be read.
-         */
-        public boolean ready() {
-            return readFut.isDone();
-        }
-
-        /**
-         * Checks if current buffer part contains given position.
-         *
-         * @param pos Position to check.
-         * @return {@code True} if position matches buffer region.
-         */
-        public boolean contains(long pos) {
-            return this.pos <= pos && this.pos + len > pos;
-        }
-    }
-
-    private class DoubleFetchBuffer {
-        /**  */
-        private FetchBufferPart first;
-
-        /** */
-        private FetchBufferPart second;
-
-        /**
-         * Copies fetched data from both buffers to destination array if cached region matched read position.
-         *
-         * @param dst Destination buffer.
-         * @param pos Read position in file.
-         * @param dstOff Destination buffer offset.
-         * @param len Maximum number of bytes to copy.
-         * @return Number of bytes copied.
-         * @throws IgniteCheckedException If any read operation failed.
-         */
-        public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException {
-            assert dstOff >= 0;
-            assert dstOff + len <= dst.length : "Invalid indices [dst.length=" + dst.length + ", dstOff=" + dstOff +
-                ", len=" + len + ']';
-
-            int bytesCopied = 0;
-
-            if (first != null) {
-                bytesCopied += first.flatten(dst, pos, dstOff, len);
-
-                if (bytesCopied != len && second != null) {
-                    assert second.pos == first.pos + first.len;
-
-                    bytesCopied += second.flatten(dst, pos + bytesCopied, dstOff + bytesCopied, len - bytesCopied);
-                }
-            }
-
-            return bytesCopied;
-        }
-
-        /**
-         * Gets byte at specified position in buffer.
-         *
-         * @param pos Stream position.
-         * @return Read byte.
-         * @throws IgniteCheckedException If read failed.
-         */
-        public int atPosition(long pos) throws IgniteCheckedException {
-            // Should not reach here if stream contains no data.
-            assert first != null;
-
-            if (first.contains(pos)) {
-                byte[] bytes = first.readFut.get();
-
-                return bytes[((int)(pos - first.pos))] & 0xFF;
-            }
-            else {
-                assert second != null;
-                assert second.contains(pos);
-
-                byte[] bytes = second.readFut.get();
-
-                return bytes[((int)(pos - second.pos))] & 0xFF;
-            }
-        }
-
-        /**
-         * Starts asynchronous buffer refresh if needed, depending on current position.
-         *
-         * @param pos Current stream position.
-         */
-        public void refreshAhead(long pos) {
-            if (fullPrefetch(pos)) {
-                first = fetch(pos, bufHalfSize);
-                second = fetch(pos + bufHalfSize, bufHalfSize);
-            }
-            else if (needFlip(pos)) {
-                first = second;
-
-                second = fetch(first.pos + first.len, bufHalfSize);
-            }
-        }
-
-        /**
-         * @param pos Position from which read is expected.
-         * @return Number of bytes available to be read without blocking.
-         */
-        public int available(long pos) {
-            int available = 0;
-
-            if (first != null) {
-                if (first.contains(pos)) {
-                    if (first.ready()) {
-                        available += (pos - first.pos);
-
-                        if (second != null && second.ready())
-                            available += second.len;
-                    }
-                }
-                else {
-                    if (second != null && second.contains(pos) && second.ready())
-                        available += (pos - second.pos);
-                }
-            }
-
-            return available;
-        }
-
-        /**
-         * Checks if position shifted enough to forget previous buffer.
-         *
-         * @param pos Current position.
-         * @return {@code True} if need flip buffers.
-         */
-        private boolean needFlip(long pos) {
-            // Return true if we read more then half of second buffer.
-            return second != null && second.contains(pos);
-        }
-
-        /**
-         * Determines if all cached bytes should be discarded and new region should be
-         * prefetched.
-         *
-         * @param curPos Current stream position.
-         * @return {@code True} if need to refresh both blocks.
-         */
-        private boolean fullPrefetch(long curPos) {
-            // If no data was prefetched yet, return true.
-            return first == null || curPos < first.pos || (second != null && curPos >= second.pos + second.len);
-        }
-
-        /**
-         * Starts asynchronous fetch for given region.
-         *
-         * @param pos Position to read from.
-         * @param size Number of bytes to read.
-         * @return Fetch buffer part.
-         */
-        private FetchBufferPart fetch(long pos, int size) {
-            long remaining = limit - pos;
-
-            size = (int)Math.min(size, remaining);
-
-            return size <= 0 ? null :
-                new FetchBufferPart(delegate.hadoop().readData(delegate, pos, size, null, 0, 0), pos, size);
-        }
-    }
-}
\ No newline at end of file


Mime
View raw message