ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [03/52] [abbrv] incubator-ignite git commit: # IGNITE-226: WIP.
Date Fri, 13 Feb 2015 13:14:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopWrapper.java
deleted file mode 100644
index af0c47a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/IgfsHadoopWrapper.java
+++ /dev/null
@@ -1,511 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.fs.hadoop;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.conf.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.fs.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.internal.fs.hadoop.IgfsHadoopEndpoint.*;
-import static org.apache.ignite.internal.fs.hadoop.IgfsHadoopUtils.*;
-
-/**
- * Wrapper for GGFS server.
- */
-public class IgfsHadoopWrapper implements IgfsHadoop {
-    /** Delegate. */
-    private final AtomicReference<Delegate> delegateRef = new AtomicReference<>();
-
-    /** Authority. */
-    private final String authority;
-
-    /** Connection string. */
-    private final IgfsHadoopEndpoint endpoint;
-
-    /** Log directory. */
-    private final String logDir;
-
-    /** Configuration. */
-    private final Configuration conf;
-
-    /** Logger. */
-    private final Log log;
-
-    /**
-     * Constructor.
-     *
-     * @param authority Authority (connection string).
-     * @param logDir Log directory for server.
-     * @param conf Configuration.
-     * @param log Current logger.
-     */
-    public IgfsHadoopWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException {
-        try {
-            this.authority = authority;
-            this.endpoint = new IgfsHadoopEndpoint(authority);
-            this.logDir = logDir;
-            this.conf = conf;
-            this.log = log;
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException("Failed to parse endpoint: " + authority, e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() {
-            @Override public IgfsHandshakeResponse apply(IgfsHadoopEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hndResp;
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close(boolean force) {
-        Delegate delegate = delegateRef.get();
-
-        if (delegate != null && delegateRef.compareAndSet(delegate, null))
-            delegate.close(force);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile info(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
-            @Override public IgfsFile apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.info(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
-            @Override public IgfsFile apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.update(path, props);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime)
-        throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Boolean>() {
-            @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.setTimes(path, accessTime, modificationTime);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Boolean>() {
-            @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.rename(src, dest);
-            }
-        }, src);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Boolean>() {
-            @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.delete(path, recursive);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start,
-        final long len) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() {
-            @Override public Collection<IgfsBlockLocation> apply(IgfsHadoopEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.affinity(path, start, len);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() {
-            @Override public IgfsPathSummary apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.contentSummary(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Boolean>() {
-            @Override public Boolean apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.mkdirs(path, props);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() {
-            @Override public Collection<IgfsFile> apply(IgfsHadoopEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.listFiles(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() {
-            @Override public Collection<IgfsPath> apply(IgfsHadoopEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.listPaths(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsStatus fsStatus() throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsStatus>() {
-            @Override public IgfsStatus apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.fsStatus();
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHadoopStreamDelegate open(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() {
-            @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.open(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHadoopStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
-        throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() {
-            @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.open(path, seqReadsBeforePrefetch);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHadoopStreamDelegate create(final IgfsPath path, final boolean overwrite,
-        final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props)
-        throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() {
-            @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.create(path, overwrite, colocate, replication, blockSize, props);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHadoopStreamDelegate append(final IgfsPath path, final boolean create,
-        @Nullable final Map<String, String> props) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsHadoopStreamDelegate>() {
-            @Override public IgfsHadoopStreamDelegate apply(IgfsHadoopEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.append(path, create, props);
-            }
-        }, path);
-    }
-
-    /**
-     * Execute closure which is not path-specific.
-     *
-     * @param clo Closure.
-     * @return Result.
-     * @throws IOException If failed.
-     */
-    private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException {
-        return withReconnectHandling(clo, null);
-    }
-
-    /**
-     * Execute closure.
-     *
-     * @param clo Closure.
-     * @param path Path for exceptions.
-     * @return Result.
-     * @throws IOException If failed.
-     */
-    private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path)
-        throws IOException {
-        Exception err = null;
-
-        for (int i = 0; i < 2; i++) {
-            Delegate curDelegate = null;
-
-            boolean close = false;
-            boolean force = false;
-
-            try {
-                curDelegate = delegate();
-
-                assert curDelegate != null;
-
-                close = curDelegate.doomed;
-
-                return clo.apply(curDelegate.hadoop, curDelegate.hndResp);
-            }
-            catch (IgfsHadoopCommunicationException e) {
-                if (curDelegate != null && !curDelegate.doomed) {
-                    // Try getting rid fo faulty delegate ASAP.
-                    delegateRef.compareAndSet(curDelegate, null);
-
-                    close = true;
-                    force = true;
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send message to a server: " + e);
-
-                err = e;
-            }
-            catch (IgniteCheckedException e) {
-                throw IgfsHadoopUtils.cast(e, path != null ? path.toString() : null);
-            }
-            finally {
-                if (close) {
-                    assert curDelegate != null;
-
-                    curDelegate.close(force);
-                }
-            }
-        }
-
-        throw new IOException("Failed to communicate with GGFS.", err);
-    }
-
-    /**
-     * Get delegate creating it if needed.
-     *
-     * @return Delegate.
-     */
-    private Delegate delegate() throws IgfsHadoopCommunicationException {
-        Exception err = null;
-
-        // 1. If delegate is set, return it immediately.
-        Delegate curDelegate = delegateRef.get();
-
-        if (curDelegate != null)
-            return curDelegate;
-
-        // 2. Guess that we are in the same VM.
-        if (!parameter(conf, PARAM_GGFS_ENDPOINT_NO_EMBED, authority, false)) {
-            IgfsEx ggfs = null;
-
-            if (endpoint.grid() == null) {
-                try {
-                    Ignite ignite = G.ignite();
-
-                    ggfs = (IgfsEx)ignite.fileSystem(endpoint.ggfs());
-                }
-                catch (Exception e) {
-                    err = e;
-                }
-            }
-            else {
-                for (Ignite ignite : G.allGrids()) {
-                    try {
-                        ggfs = (IgfsEx)ignite.fileSystem(endpoint.ggfs());
-
-                        break;
-                    }
-                    catch (Exception e) {
-                        err = e;
-                    }
-                }
-            }
-
-            if (ggfs != null) {
-                IgfsHadoopEx hadoop = null;
-
-                try {
-                    hadoop = new IgfsHadoopInProc(ggfs, log);
-
-                    curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
-                }
-                catch (IOException | IgniteCheckedException e) {
-                    if (e instanceof IgfsHadoopCommunicationException)
-                        hadoop.close(true);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to connect to in-proc GGFS, fallback to IPC mode.", e);
-
-                    err = e;
-                }
-            }
-        }
-
-        // 3. Try connecting using shmem.
-        if (!parameter(conf, PARAM_GGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false)) {
-            if (curDelegate == null && !U.isWindows()) {
-                IgfsHadoopEx hadoop = null;
-
-                try {
-                    hadoop = new IgfsHadoopOutProc(endpoint.port(), endpoint.grid(), endpoint.ggfs(), log);
-
-                    curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
-                }
-                catch (IOException | IgniteCheckedException e) {
-                    if (e instanceof IgfsHadoopCommunicationException)
-                        hadoop.close(true);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to connect to out-proc local GGFS using shmem.", e);
-
-                    err = e;
-                }
-            }
-        }
-
-        // 4. Try local TCP connection.
-        boolean skipLocTcp = parameter(conf, PARAM_GGFS_ENDPOINT_NO_LOCAL_TCP, authority, false);
-
-        if (!skipLocTcp) {
-            if (curDelegate == null) {
-                IgfsHadoopEx hadoop = null;
-
-                try {
-                    hadoop = new IgfsHadoopOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.ggfs(),
-                        log);
-
-                    curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
-                }
-                catch (IOException | IgniteCheckedException e) {
-                    if (e instanceof IgfsHadoopCommunicationException)
-                        hadoop.close(true);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to connect to out-proc local GGFS using TCP.", e);
-
-                    err = e;
-                }
-            }
-        }
-
-        // 5. Try remote TCP connection.
-        if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) {
-            IgfsHadoopEx hadoop = null;
-
-            try {
-                hadoop = new IgfsHadoopOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.ggfs(), log);
-
-                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
-            }
-            catch (IOException | IgniteCheckedException e) {
-                if (e instanceof IgfsHadoopCommunicationException)
-                    hadoop.close(true);
-
-                if (log.isDebugEnabled())
-                    log.debug("Failed to connect to out-proc remote GGFS using TCP.", e);
-
-                err = e;
-            }
-        }
-
-        if (curDelegate != null) {
-            if (!delegateRef.compareAndSet(null, curDelegate))
-                curDelegate.doomed = true;
-
-            return curDelegate;
-        }
-        else
-            throw new IgfsHadoopCommunicationException("Failed to connect to GGFS: " + endpoint, err);
-    }
-
-    /**
-     * File system operation closure.
-     */
-    private static interface FileSystemClosure<T> {
-        /**
-         * Call closure body.
-         *
-         * @param hadoop RPC handler.
-         * @param hndResp Handshake response.
-         * @return Result.
-         * @throws IgniteCheckedException If failed.
-         * @throws IOException If failed.
-         */
-        public T apply(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException;
-    }
-
-    /**
-     * Delegate.
-     */
-    private static class Delegate {
-        /** RPC handler. */
-        private final IgfsHadoopEx hadoop;
-
-        /** Handshake request. */
-        private final IgfsHandshakeResponse hndResp;
-
-        /** Close guard. */
-        private final AtomicBoolean closeGuard = new AtomicBoolean();
-
-        /** Whether this delegate must be closed at the end of the next invocation. */
-        private boolean doomed;
-
-        /**
-         * Constructor.
-         *
-         * @param hadoop Hadoop.
-         * @param hndResp Handshake response.
-         */
-        private Delegate(IgfsHadoopEx hadoop, IgfsHandshakeResponse hndResp) {
-            this.hadoop = hadoop;
-            this.hndResp = hndResp;
-        }
-
-        /**
-         * Close underlying RPC handler.
-         *
-         * @param force Force flag.
-         */
-        private void close(boolean force) {
-            if (closeGuard.compareAndSet(false, true))
-                hadoop.close(force);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/package.html
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/package.html b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/package.html
deleted file mode 100644
index 4520df8..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/hadoop/package.html
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<body>
-    <!-- Package description. -->
-    Contains GGFS client classes.
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/package.html
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/package.html b/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/package.html
deleted file mode 100644
index 4f47151..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/fs/package.html
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<body>
-    <!-- Package description. -->
-    Contains GGFS client and common classes.
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java
new file mode 100644
index 0000000..325dd16
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.fs.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Facade for communication with grid.
+ */
+public interface IgfsHadoop {
+    /**
+     * Perform handshake.
+     *
+     * @param logDir Log directory.
+     * @return Future with handshake result.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException;
+
+    /**
+     * Close connection.
+     *
+     * @param force Force flag.
+     */
+    public void close(boolean force);
+
+    /**
+     * Command to retrieve file info for some GGFS path.
+     *
+     * @param path Path to get file info for.
+     * @return Future for info operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to update file properties.
+     *
+     * @param path GGFS path to update properties.
+     * @param props Properties to update.
+     * @return Future for update operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
+
+    /**
+     * Sets last access time and last modification time for a file.
+     *
+     * @param path Path to update times.
+     * @param accessTime Last access time to set.
+     * @param modificationTime Last modification time to set.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException,
+        IOException;
+
+    /**
+     * Command to rename given path.
+     *
+     * @param src Source path.
+     * @param dest Destination path.
+     * @return Future for rename operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to delete given path.
+     *
+     * @param path Path to delete.
+     * @param recursive {@code True} if deletion is recursive.
+     * @return Future for delete operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to get affinity for given path, offset and length.
+     *
+     * @param path Path to get affinity for.
+     * @param start Start position (offset).
+     * @param len Data length.
+     * @return Future for affinity command.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException,
+        IOException;
+
+    /**
+     * Gets path summary.
+     *
+     * @param path Path to get summary for.
+     * @return Future that will be completed when summary is received.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to create directories.
+     *
+     * @param path Path to create.
+     * @return Future for mkdirs operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to get list of files in directory.
+     *
+     * @param path Path to list.
+     * @return Future for listFiles operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to get directory listing.
+     *
+     * @param path Path to list.
+     * @return Future for listPaths operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException;
+
+    /**
+     * Performs status request.
+     *
+     * @return Status response.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsStatus fsStatus() throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to open file for reading.
+     *
+     * @param path File path to open.
+     * @return Future for open operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to open file for reading.
+     *
+     * @param path File path to open.
+     * @return Future for open operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsHadoopStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException,
+        IOException;
+
+    /**
+     * Command to create file and open it for output.
+     *
+     * @param path Path to file.
+     * @param overwrite If {@code true} then old file contents will be lost.
+     * @param colocate If {@code true} and called on data node, file will be written on that node.
+     * @param replication Replication factor.
+     * @param props File properties for creation.
+     * @return Stream descriptor.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
+        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
+
+    /**
+     * Open file for output appending data to the end of a file.
+     *
+     * @param path Path to file.
+     * @param create If {@code true}, file will be created if does not exist.
+     * @param props File properties.
+     * @return Stream descriptor.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create,
+        @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java
new file mode 100644
index 0000000..f892fb1
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.*;
+
+/**
+ * Communication exception indicating a problem between file system and GGFS instance.
+ */
+public class IgfsHadoopCommunicationException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Creates new exception with given throwable as a nested cause and
+     * source of error message.
+     *
+     * @param cause Non-null throwable cause.
+     */
+    public IgfsHadoopCommunicationException(Exception cause) {
+        super(cause);
+    }
+
+    /**
+     * Creates a new exception with given error message and optional nested cause exception.
+     *
+     * @param msg Error message.
+     */
+    public IgfsHadoopCommunicationException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Creates a new exception with given error message and optional nested cause exception.
+     *
+     * @param msg Error message.
+     * @param cause Cause.
+     */
+    public IgfsHadoopCommunicationException(String msg, Exception cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java
new file mode 100644
index 0000000..42feaae
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+
+import static org.apache.ignite.configuration.IgfsConfiguration.*;
+
+/**
+ * GGFS endpoint abstraction.
+ */
+public class IgfsHadoopEndpoint {
+    /** Localhost. */
+    public static final String LOCALHOST = "127.0.0.1";
+
+    /** GGFS name. */
+    private final String ggfsName;
+
+    /** Grid name. */
+    private final String gridName;
+
+    /** Host. */
+    private final String host;
+
+    /** Port. */
+    private final int port;
+
+    /**
+     * Normalize GGFS URI.
+     *
+     * @param uri URI.
+     * @return Normalized URI.
+     * @throws IOException If failed.
+     */
+    public static URI normalize(URI uri) throws IOException {
+        try {
+            if (!F.eq(IgniteFs.GGFS_SCHEME, uri.getScheme()))
+                throw new IOException("Failed to normalize UIR because it has non GGFS scheme: " + uri);
+
+            IgfsHadoopEndpoint endpoint = new IgfsHadoopEndpoint(uri.getAuthority());
+
+            StringBuilder sb = new StringBuilder();
+
+            if (endpoint.ggfs() != null)
+                sb.append(endpoint.ggfs());
+
+            if (endpoint.grid() != null)
+                sb.append(":").append(endpoint.grid());
+
+            return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : null, endpoint.host(), endpoint.port(),
+                uri.getPath(), uri.getQuery(), uri.getFragment());
+        }
+        catch (URISyntaxException | IgniteCheckedException e) {
+            throw new IOException("Failed to normalize URI: " + uri, e);
+        }
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param connStr Connection string.
+     * @throws IgniteCheckedException If failed to parse connection string.
+     */
+    public IgfsHadoopEndpoint(@Nullable String connStr) throws IgniteCheckedException {
+        if (connStr == null)
+            connStr = "";
+
+        String[] tokens = connStr.split("@", -1);
+
+        IgniteBiTuple<String, Integer> hostPort;
+
+        if (tokens.length == 1) {
+            ggfsName = null;
+            gridName = null;
+
+            hostPort = hostPort(connStr, connStr);
+        }
+        else if (tokens.length == 2) {
+            String authStr = tokens[0];
+
+            if (authStr.isEmpty()) {
+                gridName = null;
+                ggfsName = null;
+            }
+            else {
+                String[] authTokens = authStr.split(":", -1);
+
+                ggfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0];
+
+                if (authTokens.length == 1)
+                    gridName = null;
+                else if (authTokens.length == 2)
+                    gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1];
+                else
+                    throw new IgniteCheckedException("Invalid connection string format: " + connStr);
+            }
+
+            hostPort = hostPort(connStr, tokens[1]);
+        }
+        else
+            throw new IgniteCheckedException("Invalid connection string format: " + connStr);
+
+        host = hostPort.get1();
+
+        assert hostPort.get2() != null;
+
+        port = hostPort.get2();
+    }
+
+    /**
+     * Parse host and port.
+     *
+     * @param connStr Full connection string.
+     * @param hostPortStr Host/port connection string part.
+     * @return Tuple with host and port.
+     * @throws IgniteCheckedException If failed to parse connection string.
+     */
+    private IgniteBiTuple<String, Integer> hostPort(String connStr, String hostPortStr) throws IgniteCheckedException {
+        String[] tokens = hostPortStr.split(":", -1);
+
+        String host = tokens[0];
+
+        if (F.isEmpty(host))
+            host = LOCALHOST;
+
+        int port;
+
+        if (tokens.length == 1)
+            port = DFLT_IPC_PORT;
+        else if (tokens.length == 2) {
+            String portStr = tokens[1];
+
+            try {
+                port = Integer.valueOf(portStr);
+
+                if (port < 0 || port > 65535)
+                    throw new IgniteCheckedException("Invalid port number: " + connStr);
+            }
+            catch (NumberFormatException e) {
+                throw new IgniteCheckedException("Invalid port number: " + connStr);
+            }
+        }
+        else
+            throw new IgniteCheckedException("Invalid connection string format: " + connStr);
+
+        return F.t(host, port);
+    }
+
+    /**
+     * @return GGFS name.
+     */
+    @Nullable public String ggfs() {
+        return ggfsName;
+    }
+
+    /**
+     * @return Grid name.
+     */
+    @Nullable public String grid() {
+        return gridName;
+    }
+
+    /**
+     * @return Host.
+     */
+    public String host() {
+        return host;
+    }
+
+    /**
+     * @return Host.
+     */
+    public boolean isLocal() {
+        return F.eq(LOCALHOST, host);
+    }
+
+    /**
+     * @return Port.
+     */
+    public int port() {
+        return port;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsHadoopEndpoint.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java
new file mode 100644
index 0000000..aa9de45
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Extended GGFS server interface.
+ */
+public interface IgfsHadoopEx extends IgfsHadoop {
+    /**
+     * Adds event listener that will be invoked when connection with server is lost or remote error has occurred.
+     * If connection is closed already, callback will be invoked synchronously inside this method.
+     *
+     * @param delegate Stream delegate.
+     * @param lsnr Event listener.
+     */
+    public void addEventListener(IgfsHadoopStreamDelegate delegate, IgfsHadoopStreamEventListener lsnr);
+
+    /**
+     * Removes event listener that will be invoked when connection with server is lost or remote error has occurred.
+     *
+     * @param delegate Stream delegate.
+     */
+    public void removeEventListener(IgfsHadoopStreamDelegate delegate);
+
+    /**
+     * Asynchronously reads specified amount of bytes from opened input stream.
+     *
+     * @param delegate Stream delegate.
+     * @param pos Position to read from.
+     * @param len Data length to read.
+     * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining
+     *     bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will
+     *     be the result of read future.
+     * @param outOff Output offset.
+     * @param outLen Output length.
+     * @return Read data.
+     */
+    public GridPlainFuture<byte[]> readData(IgfsHadoopStreamDelegate delegate, long pos, int len,
+        @Nullable final byte[] outBuf, final int outOff, final int outLen);
+
+    /**
+     * Writes data to the stream with given streamId. This method does not return any future since
+     * no response to write request is sent.
+     *
+     * @param delegate Stream delegate.
+     * @param data Data to write.
+     * @param off Offset.
+     * @param len Length.
+     * @throws IOException If failed.
+     */
+    public void writeData(IgfsHadoopStreamDelegate delegate, byte[] data, int off, int len) throws IOException;
+
+    /**
+     * Close server stream.
+     *
+     * @param delegate Stream delegate.
+     * @throws IOException If failed.
+     */
+    public void closeStream(IgfsHadoopStreamDelegate delegate) throws IOException;
+
+    /**
+     * Flush output stream.
+     *
+     * @param delegate Stream delegate.
+     * @throws IOException If failed.
+     */
+    public void flush(IgfsHadoopStreamDelegate delegate) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
new file mode 100644
index 0000000..e0ea1b6
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.hadoop.fs.permission.*;
+import org.apache.ignite.*;
+
+import java.util.*;
+
+import static org.apache.ignite.IgniteFs.*;
+
+/**
+ * Hadoop file system properties.
+ */
+class IgfsHadoopFSProperties {
+    /** Username. */
+    private String usrName;
+
+    /** Group name. */
+    private String grpName;
+
+    /** Permissions. */
+    private FsPermission perm;
+
+    /**
+     * Constructor.
+     *
+     * @param props Properties.
+     * @throws IgniteException In case of error.
+     */
+    IgfsHadoopFSProperties(Map<String, String> props) throws IgniteException {
+        usrName = props.get(PROP_USER_NAME);
+        grpName = props.get(PROP_GROUP_NAME);
+
+        String permStr = props.get(PROP_PERMISSION);
+
+        if (permStr != null) {
+            try {
+                perm = new FsPermission((short)Integer.parseInt(permStr, 8));
+            }
+            catch (NumberFormatException ignore) {
+                throw new IgniteException("Permissions cannot be parsed: " + permStr);
+            }
+        }
+    }
+
+    /**
+     * Get user name.
+     *
+     * @return User name.
+     */
+    String userName() {
+        return usrName;
+    }
+
+    /**
+     * Get group name.
+     *
+     * @return Group name.
+     */
+    String groupName() {
+        return grpName;
+    }
+
+    /**
+     * Get permission.
+     *
+     * @return Permission.
+     */
+    FsPermission permission() {
+        return perm;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
new file mode 100644
index 0000000..fbf27f0
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFileSystemWrapper.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.ipc.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.fs.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Adapter to use any Hadoop file system {@link org.apache.hadoop.fs.FileSystem} as {@link org.apache.ignite.igfs.Igfs}.
+ */
+public class IgfsHadoopFileSystemWrapper implements Igfs, AutoCloseable {
+    /** Property name for path to Hadoop configuration. */
+    public static final String SECONDARY_FS_CONFIG_PATH = "SECONDARY_FS_CONFIG_PATH";
+
+    /** Property name for URI of file system. */
+    public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
+
+    /** Hadoop file system. */
+    private final FileSystem fileSys;
+
+    /** Properties of file system */
+    private final Map<String, String> props = new HashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param uri URI of file system.
+     * @param cfgPath Additional path to Hadoop configuration.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public IgfsHadoopFileSystemWrapper(@Nullable String uri, @Nullable String cfgPath) throws IgniteCheckedException {
+        Configuration cfg = new Configuration();
+
+        if (cfgPath != null)
+            cfg.addResource(U.resolveIgniteUrl(cfgPath));
+
+        try {
+            fileSys = uri == null ? FileSystem.get(cfg) : FileSystem.get(new URI(uri), cfg);
+        }
+        catch (IOException | URISyntaxException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        uri = fileSys.getUri().toString();
+
+        if (!uri.endsWith("/"))
+            uri += "/";
+
+        props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
+        props.put(SECONDARY_FS_URI, uri);
+    }
+
+    /**
+     * Convert GGFS path into Hadoop path.
+     *
+     * @param path GGFS path.
+     * @return Hadoop path.
+     */
+    private Path convert(IgfsPath path) {
+        URI uri = fileSys.getUri();
+
+        return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
+    }
+
+    /**
+     * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
+     *
+     * @param e Exception to check.
+     * @param detailMsg Detailed error message.
+     * @return Appropriate exception.
+     */
+    private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
+        boolean wrongVer = X.hasCause(e, RemoteException.class) ||
+            (e.getMessage() != null && e.getMessage().contains("Failed on local"));
+
+        IgfsException ggfsErr = !wrongVer ? cast(detailMsg, e) :
+            new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " +
+                "version.", e);
+
+
+
+        return ggfsErr;
+    }
+
+    /**
+     * Cast IO exception to GGFS exception.
+     *
+     * @param e IO exception.
+     * @return GGFS exception.
+     */
+    public static IgfsException cast(String msg, IOException e) {
+        if (e instanceof FileNotFoundException)
+            return new IgfsFileNotFoundException(e);
+        else if (e instanceof ParentNotDirectoryException)
+            return new IgfsParentNotDirectoryException(msg, e);
+        else if (e instanceof PathIsNotEmptyDirectoryException)
+            return new IgfsDirectoryNotEmptyException(e);
+        else if (e instanceof PathExistsException)
+            return new IgfsPathAlreadyExistsException(msg, e);
+        else
+            return new IgfsException(msg, e);
+    }
+
+    /**
+     * Convert Hadoop FileStatus properties to map.
+     *
+     * @param status File status.
+     * @return GGFS attributes.
+     */
+    private static Map<String, String> properties(FileStatus status) {
+        FsPermission perm = status.getPermission();
+
+        if (perm == null)
+            perm = FsPermission.getDefault();
+
+        return F.asMap(PROP_PERMISSION, String.format("%04o", perm.toShort()), PROP_USER_NAME, status.getOwner(),
+            PROP_GROUP_NAME, status.getGroup());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean exists(IgfsPath path) {
+        try {
+            return fileSys.exists(convert(path));
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
+        IgfsHadoopFSProperties props0 = new IgfsHadoopFSProperties(props);
+
+        try {
+            if (props0.userName() != null || props0.groupName() != null)
+                fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
+
+            if (props0.permission() != null)
+                fileSys.setPermission(convert(path), props0.permission());
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]");
+        }
+
+        //Result is not used in case of secondary FS.
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void rename(IgfsPath src, IgfsPath dest) {
+        // Delegate to the secondary file system.
+        try {
+            if (!fileSys.rename(convert(src), convert(dest)))
+                throw new IgfsException("Failed to rename (secondary file system returned false) " +
+                    "[src=" + src + ", dest=" + dest + ']');
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']');
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean delete(IgfsPath path, boolean recursive) {
+        try {
+            return fileSys.delete(convert(path), recursive);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdirs(IgfsPath path) {
+        try {
+            if (!fileSys.mkdirs(convert(path)))
+                throw new IgniteException("Failed to make directories [path=" + path + "]");
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
+        try {
+            if (!fileSys.mkdirs(convert(path), new IgfsHadoopFSProperties(props).permission()))
+                throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
+        try {
+            FileStatus[] statuses = fileSys.listStatus(convert(path));
+
+            if (statuses == null)
+                throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
+
+            Collection<IgfsPath> res = new ArrayList<>(statuses.length);
+
+            for (FileStatus status : statuses)
+                res.add(new IgfsPath(path, status.getPath().getName()));
+
+            return res;
+        }
+        catch (FileNotFoundException ignored) {
+            throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
+        try {
+            FileStatus[] statuses = fileSys.listStatus(convert(path));
+
+            if (statuses == null)
+                throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
+
+            Collection<IgfsFile> res = new ArrayList<>(statuses.length);
+
+            for (FileStatus status : statuses) {
+                IgfsFileInfo fsInfo = status.isDirectory() ? new IgfsFileInfo(true, properties(status)) :
+                    new IgfsFileInfo((int)status.getBlockSize(), status.getLen(), null, null, false,
+                    properties(status));
+
+                res.add(new IgfsFileImpl(new IgfsPath(path, status.getPath().getName()), fsInfo, 1));
+            }
+
+            return res;
+        }
+        catch (FileNotFoundException ignored) {
+            throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsReader open(IgfsPath path, int bufSize) {
+        return new IgfsHadoopReader(fileSys, convert(path), bufSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public OutputStream create(IgfsPath path, boolean overwrite) {
+        try {
+            return fileSys.create(convert(path), overwrite);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
+        long blockSize, @Nullable Map<String, String> props) {
+        IgfsHadoopFSProperties props0 =
+            new IgfsHadoopFSProperties(props != null ? props : Collections.<String, String>emptyMap());
+
+        try {
+            return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize,
+                null);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
+                ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication +
+                ", blockSize=" + blockSize + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
+        @Nullable Map<String, String> props) {
+        try {
+            return fileSys.append(convert(path), bufSize);
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile info(final IgfsPath path) {
+        try {
+            final FileStatus status = fileSys.getFileStatus(convert(path));
+
+            if (status == null)
+                return null;
+
+            final Map<String, String> props = properties(status);
+
+            return new IgfsFile() {
+                @Override public IgfsPath path() {
+                    return path;
+                }
+
+                @Override public boolean isFile() {
+                    return status.isFile();
+                }
+
+                @Override public boolean isDirectory() {
+                    return status.isDirectory();
+                }
+
+                @Override public int blockSize() {
+                    return (int)status.getBlockSize();
+                }
+
+                @Override public long groupBlockSize() {
+                    return status.getBlockSize();
+                }
+
+                @Override public long accessTime() {
+                    return status.getAccessTime();
+                }
+
+                @Override public long modificationTime() {
+                    return status.getModificationTime();
+                }
+
+                @Override public String property(String name) throws IllegalArgumentException {
+                    String val = props.get(name);
+
+                    if (val ==  null)
+                        throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']');
+
+                    return val;
+                }
+
+                @Nullable @Override public String property(String name, @Nullable String dfltVal) {
+                    String val = props.get(name);
+
+                    return val == null ? dfltVal : val;
+                }
+
+                @Override public long length() {
+                    return status.getLen();
+                }
+
+                /** {@inheritDoc} */
+                @Override public Map<String, String> properties() {
+                    return props;
+                }
+            };
+
+        }
+        catch (FileNotFoundException ignore) {
+            return null;
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long usedSpaceSize() {
+        try {
+            return fileSys.getContentSummary(new Path(fileSys.getUri())).getSpaceConsumed();
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<String, String> properties() {
+        return props;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IgniteCheckedException {
+        try {
+            fileSys.close();
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java
new file mode 100644
index 0000000..3d4bda8
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * GGFS client future that holds response parse closure.
+ */
+public class IgfsHadoopFuture<T> extends GridPlainFutureAdapter<T> {
+    /** Output buffer. */
+    private byte[] outBuf;
+
+    /** Output offset. */
+    private int outOff;
+
+    /** Output length. */
+    private int outLen;
+
+    /** Read future flag. */
+    private boolean read;
+
+    /**
+     * @return Output buffer.
+     */
+    public byte[] outputBuffer() {
+        return outBuf;
+    }
+
+    /**
+     * @param outBuf Output buffer.
+     */
+    public void outputBuffer(@Nullable byte[] outBuf) {
+        this.outBuf = outBuf;
+    }
+
+    /**
+     * @return Offset in output buffer to write from.
+     */
+    public int outputOffset() {
+        return outOff;
+    }
+
+    /**
+     * @param outOff Offset in output buffer to write from.
+     */
+    public void outputOffset(int outOff) {
+        this.outOff = outOff;
+    }
+
+    /**
+     * @return Length to write to output buffer.
+     */
+    public int outputLength() {
+        return outLen;
+    }
+
+    /**
+     * @param outLen Length to write to output buffer.
+     */
+    public void outputLength(int outLen) {
+        this.outLen = outLen;
+    }
+
+    /**
+     * @param read {@code True} if this is a read future.
+     */
+    public void read(boolean read) {
+        this.read = read;
+    }
+
+    /**
+     * @return {@code True} if this is a read future.
+     */
+    public boolean read() {
+        return read;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java
new file mode 100644
index 0000000..09cb964
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.commons.logging.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.fs.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Communication with grid in the same process.
+ */
+public class IgfsHadoopInProc implements IgfsHadoopEx {
+    /** Target GGFS. */
+    private final IgfsEx ggfs;
+
+    /** Buffer size. */
+    private final int bufSize;
+
+    /** Event listeners. */
+    private final Map<IgfsHadoopStreamDelegate, IgfsHadoopStreamEventListener> lsnrs =
+        new ConcurrentHashMap<>();
+
+    /** Logger. */
+    private final Log log;
+
+    /**
+     * Constructor.
+     *
+     * @param ggfs Target GGFS.
+     * @param log Log.
+     */
+    public IgfsHadoopInProc(IgfsEx ggfs, Log log) {
+        this.ggfs = ggfs;
+        this.log = log;
+
+        bufSize = ggfs.configuration().getBlockSize() * 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHandshakeResponse handshake(String logDir) {
+        ggfs.clientLogDirectory(logDir);
+
+        return new IgfsHandshakeResponse(ggfs.name(), ggfs.proxyPaths(), ggfs.groupBlockSize(),
+            ggfs.globalSampling());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(boolean force) {
+        // Perform cleanup.
+        for (IgfsHadoopStreamEventListener lsnr : lsnrs.values()) {
+            try {
+                lsnr.onClose();
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to notify stream event listener", e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
+        try {
+            return ggfs.info(path);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to get file info because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+        try {
+            return ggfs.update(path, props);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to update file because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException {
+        try {
+            ggfs.setTimes(path, accessTime, modificationTime);
+
+            return true;
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to set path times because Grid is stopping: " +
+                path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
+        try {
+            ggfs.rename(src, dest);
+
+            return true;
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to rename path because Grid is stopping: " + src);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
+        try {
+            return ggfs.delete(path, recursive);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to delete path because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
+        try {
+            return ggfs.globalSpace();
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to get file system status because Grid is " +
+                "stopping.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
+        try {
+            return ggfs.listPaths(path);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to list paths because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
+        try {
+            return ggfs.listFiles(path);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to list files because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+        try {
+            ggfs.mkdirs(path, props);
+
+            return true;
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to create directory because Grid is stopping: " +
+                path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
+        try {
+            return ggfs.summary(path);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to get content summary because Grid is stopping: " +
+                path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
+        throws IgniteCheckedException {
+        try {
+            return ggfs.affinity(path, start, len);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to get affinity because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
+        try {
+            IgfsInputStreamAdapter stream = ggfs.open(path, bufSize);
+
+            return new IgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length());
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHadoopStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch)
+        throws IgniteCheckedException {
+        try {
+            IgfsInputStreamAdapter stream = ggfs.open(path, bufSize, seqReadsBeforePrefetch);
+
+            return new IgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length());
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
+        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
+        try {
+            IgfsOutputStream stream = ggfs.create(path, bufSize, overwrite,
+                colocate ? ggfs.nextAffinityKey() : null, replication, blockSize, props);
+
+            return new IgfsHadoopStreamDelegate(this, stream);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to create file because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create,
+        @Nullable Map<String, String> props) throws IgniteCheckedException {
+        try {
+            IgfsOutputStream stream = ggfs.append(path, bufSize, create, props);
+
+            return new IgfsHadoopStreamDelegate(this, stream);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new IgfsHadoopCommunicationException("Failed to append file because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridPlainFuture<byte[]> readData(IgfsHadoopStreamDelegate delegate, long pos, int len,
+        @Nullable byte[] outBuf, int outOff, int outLen) {
+        IgfsInputStreamAdapter stream = delegate.target();
+
+        try {
+            byte[] res = null;
+
+            if (outBuf != null) {
+                int outTailLen = outBuf.length - outOff;
+
+                if (len <= outTailLen)
+                    stream.readFully(pos, outBuf, outOff, len);
+                else {
+                    stream.readFully(pos, outBuf, outOff, outTailLen);
+
+                    int remainderLen = len - outTailLen;
+
+                    res = new byte[remainderLen];
+
+                    stream.readFully(pos, res, 0, remainderLen);
+                }
+            } else {
+                res = new byte[len];
+
+                stream.readFully(pos, res, 0, len);
+            }
+
+            return new GridPlainFutureAdapter<>(res);
+        }
+        catch (IllegalStateException | IOException e) {
+            IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate);
+
+            if (lsnr != null)
+                lsnr.onError(e.getMessage());
+
+            return new GridPlainFutureAdapter<>(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeData(IgfsHadoopStreamDelegate delegate, byte[] data, int off, int len)
+        throws IOException {
+        try {
+            IgfsOutputStream stream = delegate.target();
+
+            stream.write(data, off, len);
+        }
+        catch (IllegalStateException | IOException e) {
+            IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate);
+
+            if (lsnr != null)
+                lsnr.onError(e.getMessage());
+
+            if (e instanceof IllegalStateException)
+                throw new IOException("Failed to write data to GGFS stream because Grid is stopping.", e);
+            else
+                throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void flush(IgfsHadoopStreamDelegate delegate) throws IOException {
+        try {
+            IgfsOutputStream stream = delegate.target();
+
+            stream.flush();
+        }
+        catch (IllegalStateException | IOException e) {
+            IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate);
+
+            if (lsnr != null)
+                lsnr.onError(e.getMessage());
+
+            if (e instanceof IllegalStateException)
+                throw new IOException("Failed to flush data to GGFS stream because Grid is stopping.", e);
+            else
+                throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void closeStream(IgfsHadoopStreamDelegate desc) throws IOException {
+        Closeable closeable = desc.target();
+
+        try {
+            closeable.close();
+        }
+        catch (IllegalStateException e) {
+            throw new IOException("Failed to close GGFS stream because Grid is stopping.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addEventListener(IgfsHadoopStreamDelegate delegate,
+        IgfsHadoopStreamEventListener lsnr) {
+        IgfsHadoopStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr);
+
+        assert lsnr0 == null || lsnr0 == lsnr;
+
+        if (log.isDebugEnabled())
+            log.debug("Added stream event listener [delegate=" + delegate + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeEventListener(IgfsHadoopStreamDelegate delegate) {
+        IgfsHadoopStreamEventListener lsnr0 = lsnrs.remove(delegate);
+
+        if (lsnr0 != null && log.isDebugEnabled())
+            log.debug("Removed stream event listener [delegate=" + delegate + ']');
+    }
+}


Mime
View raw message