ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [20/46] ignite git commit: IGNITE-3953: Hadoop: Merged back both modules.
Date Mon, 26 Sep 2016 07:17:16 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutProc.java
new file mode 100644
index 0000000..4dc3c7f
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutProc.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsException;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathSummary;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.igfs.common.IgfsControlResponse;
+import org.apache.ignite.internal.igfs.common.IgfsHandshakeRequest;
+import org.apache.ignite.internal.igfs.common.IgfsMessage;
+import org.apache.ignite.internal.igfs.common.IgfsPathControlRequest;
+import org.apache.ignite.internal.igfs.common.IgfsStatusRequest;
+import org.apache.ignite.internal.igfs.common.IgfsStreamControlRequest;
+import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
+import org.apache.ignite.internal.processors.igfs.IgfsInputStreamDescriptor;
+import org.apache.ignite.internal.processors.igfs.IgfsStatus;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.lang.IgniteClosure;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.AFFINITY;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.CLOSE;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.DELETE;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.INFO;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_FILES;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_PATHS;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.MAKE_DIRECTORIES;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_APPEND;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_CREATE;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_READ;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.PATH_SUMMARY;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.READ_BLOCK;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.RENAME;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.SET_TIMES;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.UPDATE;
+import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.WRITE_BLOCK;
+
+/**
+ * Communication with external process (TCP or shmem).
+ */
+public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener {
+    /** Expected result is boolean. */
+    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Boolean> BOOL_RES = createClosure();
+
+    /** Expected result is boolean. */
+    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Long> LONG_RES = createClosure();
+
+    /** Expected result is {@code IgfsFile}. */
+    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsFile> FILE_RES = createClosure();
+
+    /** Expected result is {@code IgfsHandshakeResponse} */
+    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
+        IgfsHandshakeResponse> HANDSHAKE_RES = createClosure();
+
+    /** Expected result is {@code IgfsStatus} */
+    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsStatus> STATUS_RES =
+        createClosure();
+
+    /** Expected result is {@code IgfsFile}. */
+    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
+        IgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = createClosure();
+
+    /** Expected result is {@code IgfsFile}. */
+    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
+        Collection<IgfsFile>> FILE_COL_RES = createClosure();
+
+    /** Expected result is {@code IgfsFile}. */
+    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
+        Collection<IgfsPath>> PATH_COL_RES = createClosure();
+
+    /** Expected result is {@code IgfsPathSummary}. */
+    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsPathSummary> SUMMARY_RES =
+        createClosure();
+
+    /** Expected result is {@code IgfsFile}. */
+    private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
+        Collection<IgfsBlockLocation>> BLOCK_LOCATION_COL_RES = createClosure();
+
+    /** Grid name. */
+    private final String grid;
+
+    /** IGFS name. */
+    private final String igfs;
+
+    /** The user this out proc is performing on behalf of. */
+    private final String userName;
+
+    /** Client log. */
+    private final Log log;
+
+    /** Client IO. */
+    private final HadoopIgfsIpcIo io;
+
+    /** Event listeners. */
+    private final Map<Long, HadoopIgfsStreamEventListener> lsnrs = new ConcurrentHashMap8<>();
+
+    /**
+     * Constructor for TCP endpoint.
+     *
+     * @param host Host.
+     * @param port Port.
+     * @param grid Grid name.
+     * @param igfs IGFS name.
+     * @param log Client logger.
+     * @throws IOException If failed.
+     */
+    public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log, String user) throws IOException {
+        this(host, port, grid, igfs, false, log, user);
+    }
+
+    /**
+     * Constructor for shmem endpoint.
+     *
+     * @param port Port.
+     * @param grid Grid name.
+     * @param igfs IGFS name.
+     * @param log Client logger.
+     * @throws IOException If failed.
+     */
+    public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, String user) throws IOException {
+        this(null, port, grid, igfs, true, log, user);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param host Host.
+     * @param port Port.
+     * @param grid Grid name.
+     * @param igfs IGFS name.
+     * @param shmem Shared memory flag.
+     * @param log Client logger.
+     * @throws IOException If failed.
+     */
+    private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log, String user)
+        throws IOException {
+        assert host != null && !shmem || host == null && shmem :
+            "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']';
+
+        String endpoint = host != null ? host + ":" + port : "shmem:" + port;
+
+        this.grid = grid;
+        this.igfs = igfs;
+        this.log = log;
+        this.userName = IgfsUtils.fixUserName(user);
+
+        io = HadoopIgfsIpcIo.get(log, endpoint);
+
+        io.addEventListener(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException {
+        final IgfsHandshakeRequest req = new IgfsHandshakeRequest();
+
+        req.gridName(grid);
+        req.igfsName(igfs);
+        req.logDirectory(logDir);
+
+        return io.send(req).chain(HANDSHAKE_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(boolean force) {
+        assert io != null;
+
+        io.removeEventListener(this);
+
+        if (force)
+            io.forceClose();
+        else
+            io.release();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(INFO);
+        msg.path(path);
+        msg.userName(userName);
+
+        return io.send(msg).chain(FILE_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(UPDATE);
+        msg.path(path);
+        msg.properties(props);
+        msg.userName(userName);
+
+        return io.send(msg).chain(FILE_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(SET_TIMES);
+        msg.path(path);
+        msg.accessTime(accessTime);
+        msg.modificationTime(modificationTime);
+        msg.userName(userName);
+
+        return io.send(msg).chain(BOOL_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(RENAME);
+        msg.path(src);
+        msg.destinationPath(dest);
+        msg.userName(userName);
+
+        return io.send(msg).chain(BOOL_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(DELETE);
+        msg.path(path);
+        msg.flag(recursive);
+        msg.userName(userName);
+
+        return io.send(msg).chain(BOOL_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
+        throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(AFFINITY);
+        msg.path(path);
+        msg.start(start);
+        msg.length(len);
+        msg.userName(userName);
+
+        return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(PATH_SUMMARY);
+        msg.path(path);
+        msg.userName(userName);
+
+        return io.send(msg).chain(SUMMARY_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(MAKE_DIRECTORIES);
+        msg.path(path);
+        msg.properties(props);
+        msg.userName(userName);
+
+        return io.send(msg).chain(BOOL_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(LIST_FILES);
+        msg.path(path);
+        msg.userName(userName);
+
+        return io.send(msg).chain(FILE_COL_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(LIST_PATHS);
+        msg.path(path);
+        msg.userName(userName);
+
+        return io.send(msg).chain(PATH_COL_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
+        return io.send(new IgfsStatusRequest()).chain(STATUS_RES).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(OPEN_READ);
+        msg.path(path);
+        msg.flag(false);
+        msg.userName(userName);
+
+        IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
+
+        return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(IgfsPath path,
+        int seqReadsBeforePrefetch) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(OPEN_READ);
+        msg.path(path);
+        msg.flag(true);
+        msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch);
+        msg.userName(userName);
+
+        IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
+
+        return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
+        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(OPEN_CREATE);
+        msg.path(path);
+        msg.flag(overwrite);
+        msg.colocate(colocate);
+        msg.properties(props);
+        msg.replication(replication);
+        msg.blockSize(blockSize);
+        msg.userName(userName);
+
+        Long streamId = io.send(msg).chain(LONG_RES).get();
+
+        return new HadoopIgfsStreamDelegate(this, streamId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
+        @Nullable Map<String, String> props) throws IgniteCheckedException {
+        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
+
+        msg.command(OPEN_APPEND);
+        msg.path(path);
+        msg.flag(create);
+        msg.properties(props);
+        msg.userName(userName);
+
+        Long streamId = io.send(msg).chain(LONG_RES).get();
+
+        return new HadoopIgfsStreamDelegate(this, streamId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate desc, long pos, int len,
+        final @Nullable byte[] outBuf, final int outOff, final int outLen) {
+        assert len > 0;
+
+        final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
+
+        msg.command(READ_BLOCK);
+        msg.streamId((long) desc.target());
+        msg.position(pos);
+        msg.length(len);
+
+        try {
+            return io.send(msg, outBuf, outOff, outLen);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeData(HadoopIgfsStreamDelegate desc, byte[] data, int off, int len)
+        throws IOException {
+        final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
+
+        msg.command(WRITE_BLOCK);
+        msg.streamId((long) desc.target());
+        msg.data(data);
+        msg.position(off);
+        msg.length(len);
+
+        try {
+            io.sendPlain(msg);
+        }
+        catch (IgniteCheckedException e) {
+            throw HadoopIgfsUtils.cast(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException {
+        final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
+
+        msg.command(CLOSE);
+        msg.streamId((long)desc.target());
+
+        try {
+            io.send(msg).chain(BOOL_RES).get();
+        }
+        catch (IgniteCheckedException e) {
+            throw HadoopIgfsUtils.cast(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addEventListener(HadoopIgfsStreamDelegate desc,
+        HadoopIgfsStreamEventListener lsnr) {
+        long streamId = desc.target();
+
+        HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(streamId, lsnr);
+
+        assert lsnr0 == null || lsnr0 == lsnr;
+
+        if (log.isDebugEnabled())
+            log.debug("Added stream event listener [streamId=" + streamId + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeEventListener(HadoopIgfsStreamDelegate desc) {
+        long streamId = desc.target();
+
+        HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(streamId);
+
+        if (lsnr0 != null && log.isDebugEnabled())
+            log.debug("Removed stream event listener [streamId=" + streamId + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClose() {
+        for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) {
+            try {
+                lsnr.onClose();
+            }
+            catch (IgniteCheckedException e) {
+                log.warn("Got exception from stream event listener (will ignore): " + lsnr, e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onError(long streamId, String errMsg) {
+        HadoopIgfsStreamEventListener lsnr = lsnrs.get(streamId);
+
+        if (lsnr != null)
+            lsnr.onError(errMsg);
+        else
+            log.warn("Received write error response for not registered output stream (will ignore) " +
+                "[streamId= " + streamId + ']');
+    }
+
+    /**
+     * Creates conversion closure for given type.
+     *
+     * @param <T> Type of expected result.
+     * @return Conversion closure.
+     */
+    @SuppressWarnings("unchecked")
+    private static <T> IgniteClosure<IgniteInternalFuture<IgfsMessage>, T> createClosure() {
+        return new IgniteClosure<IgniteInternalFuture<IgfsMessage>, T>() {
+            @Override public T apply(IgniteInternalFuture<IgfsMessage> fut) {
+                try {
+                    IgfsControlResponse res = (IgfsControlResponse)fut.get();
+
+                    if (res.hasError())
+                        res.throwError();
+
+                    return (T)res.response();
+                }
+                catch (IgfsException | IgniteCheckedException e) {
+                    throw new GridClosureException(e);
+                }
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return userName;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutputStream.java
new file mode 100644
index 0000000..7f95a6b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsOutputStream.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.igfs.common.IgfsLogger;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * IGFS Hadoop output stream implementation.
+ */
+public class HadoopIgfsOutputStream extends OutputStream implements HadoopIgfsStreamEventListener {
+    /** Log instance. */
+    private Log log;
+
+    /** Client logger. */
+    private IgfsLogger clientLog;
+
+    /** Log stream ID. */
+    private long logStreamId;
+
+    /** Server stream delegate. */
+    private HadoopIgfsStreamDelegate delegate;
+
+    /** Closed flag. */
+    private volatile boolean closed;
+
+    /** Flag set if stream was closed due to connection breakage. */
+    private boolean connBroken;
+
+    /** Error message. */
+    private volatile String errMsg;
+
+    /** Read time. */
+    private long writeTime;
+
+    /** User time. */
+    private long userTime;
+
+    /** Last timestamp. */
+    private long lastTs;
+
+    /** Amount of written bytes. */
+    private long total;
+
+    /**
+     * Creates light output stream.
+     *
+     * @param delegate Server stream delegate.
+     * @param log Logger to use.
+     * @param clientLog Client logger.
+     */
+    public HadoopIgfsOutputStream(HadoopIgfsStreamDelegate delegate, Log log,
+        IgfsLogger clientLog, long logStreamId) {
+        this.delegate = delegate;
+        this.log = log;
+        this.clientLog = clientLog;
+        this.logStreamId = logStreamId;
+
+        lastTs = System.nanoTime();
+
+        delegate.hadoop().addEventListener(delegate, this);
+    }
+
+    /**
+     * Read start.
+     */
+    private void writeStart() {
+        long now = System.nanoTime();
+
+        userTime += now - lastTs;
+
+        lastTs = now;
+    }
+
+    /**
+     * Read end.
+     */
+    private void writeEnd() {
+        long now = System.nanoTime();
+
+        writeTime += now - lastTs;
+
+        lastTs = now;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(@NotNull byte[] b, int off, int len) throws IOException {
+        check();
+
+        writeStart();
+
+        try {
+            delegate.hadoop().writeData(delegate, b, off, len);
+
+            total += len;
+        }
+        finally {
+            writeEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(int b) throws IOException {
+        write(new byte[] {(byte)b});
+
+        total++;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void flush() throws IOException {
+        delegate.hadoop().flush(delegate);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IOException {
+        if (!closed) {
+            if (log.isDebugEnabled())
+                log.debug("Closing output stream: " + delegate);
+
+            writeStart();
+
+            delegate.hadoop().closeStream(delegate);
+
+            markClosed(false);
+
+            writeEnd();
+
+            if (clientLog.isLogEnabled())
+                clientLog.logCloseOut(logStreamId, userTime, writeTime, total);
+
+            if (log.isDebugEnabled())
+                log.debug("Closed output stream [delegate=" + delegate + ", writeTime=" + writeTime / 1000 +
+                    ", userTime=" + userTime / 1000 + ']');
+        }
+        else if(connBroken)
+            throw new IOException(
+                "Failed to close stream, because connection was broken (data could have been lost).");
+    }
+
+    /**
+     * Marks stream as closed.
+     *
+     * @param connBroken {@code True} if connection with server was lost.
+     */
+    private void markClosed(boolean connBroken) {
+        // It is ok to have race here.
+        if (!closed) {
+            closed = true;
+
+            delegate.hadoop().removeEventListener(delegate);
+
+            this.connBroken = connBroken;
+        }
+    }
+
+    /**
+     * @throws IOException If check failed.
+     */
+    private void check() throws IOException {
+        String errMsg0 = errMsg;
+
+        if (errMsg0 != null)
+            throw new IOException(errMsg0);
+
+        if (closed) {
+            if (connBroken)
+                throw new IOException("Server connection was lost.");
+            else
+                throw new IOException("Stream is closed.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClose() throws IgniteCheckedException {
+        markClosed(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onError(String errMsg) {
+        this.errMsg = errMsg;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsProperties.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsProperties.java
new file mode 100644
index 0000000..5427bf1
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsProperties.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+
+import java.util.Map;
+
+/**
+ * Hadoop file system properties.
+ */
+public class HadoopIgfsProperties {
+    /** Username. */
+    private String usrName;
+
+    /** Group name. */
+    private String grpName;
+
+    /** Permissions. */
+    private FsPermission perm;
+
+    /**
+     * Constructor.
+     *
+     * @param props Properties.
+     * @throws IgniteException In case of error.
+     */
+    public HadoopIgfsProperties(Map<String, String> props) throws IgniteException {
+        usrName = props.get(IgfsUtils.PROP_USER_NAME);
+        grpName = props.get(IgfsUtils.PROP_GROUP_NAME);
+
+        String permStr = props.get(IgfsUtils.PROP_PERMISSION);
+
+        if (permStr != null) {
+            try {
+                perm = new FsPermission((short)Integer.parseInt(permStr, 8));
+            }
+            catch (NumberFormatException ignore) {
+                throw new IgniteException("Permissions cannot be parsed: " + permStr);
+            }
+        }
+    }
+
+    /**
+     * Get user name.
+     *
+     * @return User name.
+     */
+    public String userName() {
+        return usrName;
+    }
+
+    /**
+     * Get group name.
+     *
+     * @return Group name.
+     */
+    public String groupName() {
+        return grpName;
+    }
+
+    /**
+     * Get permission.
+     *
+     * @return Permission.
+     */
+    public FsPermission permission() {
+        return perm;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsProxyInputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsProxyInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsProxyInputStream.java
new file mode 100644
index 0000000..133e207
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsProxyInputStream.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.ignite.internal.igfs.common.IgfsLogger;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Secondary Hadoop file system input stream wrapper.
+ */
+public class HadoopIgfsProxyInputStream extends InputStream implements Seekable, PositionedReadable {
+    /** Actual input stream to the secondary file system. */
+    private final FSDataInputStream is;
+
+    /** Client logger. */
+    private final IgfsLogger clientLog;
+
+    /** Log stream ID. */
+    private final long logStreamId;
+
+    /** Read time. */
+    private long readTime;
+
+    /** User time. */
+    private long userTime;
+
+    /** Last timestamp. */
+    private long lastTs;
+
+    /** Amount of read bytes. */
+    private long total;
+
+    /** Closed flag. */
+    private boolean closed;
+
+    /**
+     * Constructor.
+     *
+     * @param is Actual input stream to the secondary file system.
+     * @param clientLog Client log.
+     */
+    public HadoopIgfsProxyInputStream(FSDataInputStream is, IgfsLogger clientLog, long logStreamId) {
+        assert is != null;
+        assert clientLog != null;
+
+        this.is = is;
+        this.clientLog = clientLog;
+        this.logStreamId = logStreamId;
+
+        lastTs = System.nanoTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(byte[] b) throws IOException {
+        readStart();
+
+        int res;
+
+        try {
+            res = is.read(b);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (res != -1)
+            total += res;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
+        readStart();
+
+        int res;
+
+        try {
+            res = super.read(b, off, len);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (res != -1)
+            total += res;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized long skip(long n) throws IOException {
+        readStart();
+
+        long res;
+
+        try {
+            res =  is.skip(n);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (clientLog.isLogEnabled())
+            clientLog.logSkip(logStreamId, res);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int available() throws IOException {
+        readStart();
+
+        try {
+            return is.available();
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() throws IOException {
+        if (!closed) {
+            closed = true;
+
+            readStart();
+
+            try {
+                is.close();
+            }
+            finally {
+                readEnd();
+            }
+
+            if (clientLog.isLogEnabled())
+                clientLog.logCloseIn(logStreamId, userTime, readTime, total);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void mark(int readLimit) {
+        readStart();
+
+        try {
+            is.mark(readLimit);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (clientLog.isLogEnabled())
+            clientLog.logMark(logStreamId, readLimit);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void reset() throws IOException {
+        readStart();
+
+        try {
+            is.reset();
+        }
+        finally {
+            readEnd();
+        }
+
+        if (clientLog.isLogEnabled())
+            clientLog.logReset(logStreamId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean markSupported() {
+        readStart();
+
+        try {
+            return is.markSupported();
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read() throws IOException {
+        readStart();
+
+        int res;
+
+        try {
+            res = is.read();
+        }
+        finally {
+            readEnd();
+        }
+
+        if (res != -1)
+            total++;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
+        readStart();
+
+        int res;
+
+        try {
+            res = is.read(pos, buf, off, len);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (res != -1)
+            total += res;
+
+        if (clientLog.isLogEnabled())
+            clientLog.logRandomRead(logStreamId, pos, res);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException {
+        readStart();
+
+        try {
+            is.readFully(pos, buf, off, len);
+        }
+        finally {
+            readEnd();
+        }
+
+        total += len;
+
+        if (clientLog.isLogEnabled())
+            clientLog.logRandomRead(logStreamId, pos, len);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void readFully(long pos, byte[] buf) throws IOException {
+        readStart();
+
+        try {
+            is.readFully(pos, buf);
+        }
+        finally {
+            readEnd();
+        }
+
+        total += buf.length;
+
+        if (clientLog.isLogEnabled())
+            clientLog.logRandomRead(logStreamId, pos, buf.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void seek(long pos) throws IOException {
+        readStart();
+
+        try {
+            is.seek(pos);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (clientLog.isLogEnabled())
+            clientLog.logSeek(logStreamId, pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized long getPos() throws IOException {
+        readStart();
+
+        try {
+            return is.getPos();
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+        readStart();
+
+        try {
+            return is.seekToNewSource(targetPos);
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /**
+     * Read start.
+     */
+    private void readStart() {
+        long now = System.nanoTime();
+
+        userTime += now - lastTs;
+
+        lastTs = now;
+    }
+
+    /**
+     * Read end.
+     */
+    private void readEnd() {
+        long now = System.nanoTime();
+
+        readTime += now - lastTs;
+
+        lastTs = now;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsProxyOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsProxyOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsProxyOutputStream.java
new file mode 100644
index 0000000..8917a95
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsProxyOutputStream.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.ignite.internal.igfs.common.IgfsLogger;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Secondary Hadoop file system output stream wrapper.
+ */
+public class HadoopIgfsProxyOutputStream extends OutputStream {
+    /** Actual output stream. */
+    private FSDataOutputStream os;
+
+    /** Client logger. */
+    private final IgfsLogger clientLog;
+
+    /** Log stream ID. */
+    private final long logStreamId;
+
+    /** Read time. */
+    private long writeTime;
+
+    /** User time. */
+    private long userTime;
+
+    /** Last timestamp. */
+    private long lastTs;
+
+    /** Amount of written bytes. */
+    private long total;
+
+    /** Closed flag. */
+    private boolean closed;
+
+    /**
+     * Constructor.
+     *
+     * @param os Actual output stream.
+     * @param clientLog Client logger.
+     * @param logStreamId Log stream ID.
+     */
+    public HadoopIgfsProxyOutputStream(FSDataOutputStream os, IgfsLogger clientLog, long logStreamId) {
+        assert os != null;
+        assert clientLog != null;
+
+        this.os = os;
+        this.clientLog = clientLog;
+        this.logStreamId = logStreamId;
+
+        lastTs = System.nanoTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(int b) throws IOException {
+        writeStart();
+
+        try {
+            os.write(b);
+        }
+        finally {
+            writeEnd();
+        }
+
+        total++;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(byte[] b) throws IOException {
+        writeStart();
+
+        try {
+            os.write(b);
+        }
+        finally {
+            writeEnd();
+        }
+
+        total += b.length;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
+        writeStart();
+
+        try {
+            os.write(b, off, len);
+        }
+        finally {
+            writeEnd();
+        }
+
+        total += len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void flush() throws IOException {
+        writeStart();
+
+        try {
+            os.flush();
+        }
+        finally {
+            writeEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() throws IOException {
+        if (!closed) {
+            closed = true;
+
+            writeStart();
+
+            try {
+                os.close();
+            }
+            finally {
+                writeEnd();
+            }
+
+            if (clientLog.isLogEnabled())
+                clientLog.logCloseOut(logStreamId, userTime, writeTime, total);
+        }
+    }
+
+    /**
+     * Read start.
+     */
+    private void writeStart() {
+        long now = System.nanoTime();
+
+        userTime += now - lastTs;
+
+        lastTs = now;
+    }
+
+    /**
+     * Read end.
+     */
+    private void writeEnd() {
+        long now = System.nanoTime();
+
+        writeTime += now - lastTs;
+
+        lastTs = now;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java
new file mode 100644
index 0000000..1a4add5
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.io.IOException;
+
+/**
+ * Secondary file system input stream wrapper which actually opens input stream only in case it is explicitly
+ * requested.
+ * <p>
+ * The class is expected to be used only from synchronized context and therefore is not tread-safe.
+ */
+public class HadoopIgfsSecondaryFileSystemPositionedReadable implements IgfsSecondaryFileSystemPositionedReadable {
+    /** Secondary file system. */
+    private final FileSystem fs;
+
+    /** Path to the file to open. */
+    private final Path path;
+
+    /** Buffer size. */
+    private final int bufSize;
+
+    /** Actual input stream. */
+    private FSDataInputStream in;
+
+    /** Cached error occurred during output stream open. */
+    private IOException err;
+
+    /** Flag indicating that the stream was already opened. */
+    private boolean opened;
+
+    /**
+     * Constructor.
+     *
+     * @param fs Secondary file system.
+     * @param path Path to the file to open.
+     * @param bufSize Buffer size.
+     */
+    public HadoopIgfsSecondaryFileSystemPositionedReadable(FileSystem fs, Path path, int bufSize) {
+        assert fs != null;
+        assert path != null;
+
+        this.fs = fs;
+        this.path = path;
+        this.bufSize = bufSize;
+    }
+
+    /** Get input stream. */
+    private PositionedReadable in() throws IOException {
+        if (opened) {
+            if (err != null)
+                throw err;
+        }
+        else {
+            opened = true;
+
+            try {
+                in = fs.open(path, bufSize);
+
+                if (in == null)
+                    throw new IOException("Failed to open input stream (file system returned null): " + path);
+            }
+            catch (IOException e) {
+                err = e;
+
+                throw err;
+            }
+        }
+
+        return in;
+    }
+
+    /**
+     * Close wrapped input stream in case it was previously opened.
+     */
+    @Override public void close() {
+        U.closeQuiet(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(long pos, byte[] buf, int off, int len) throws IOException {
+        return in().read(pos, buf, off, len);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsStreamDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsStreamDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsStreamDelegate.java
new file mode 100644
index 0000000..e6f8061
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsStreamDelegate.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * IGFS Hadoop stream descriptor.
+ */
+public class HadoopIgfsStreamDelegate {
+    /** RPC handler. */
+    private final HadoopIgfsEx hadoop;
+
+    /** Target. */
+    private final Object target;
+
+    /** Optional stream length. */
+    private final long len;
+
+    /**
+     * Constructor.
+     *
+     * @param target Target.
+     */
+    public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target) {
+        this(hadoop, target, -1);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param target Target.
+     * @param len Optional length.
+     */
+    public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target, long len) {
+        assert hadoop != null;
+        assert target != null;
+
+        this.hadoop = hadoop;
+        this.target = target;
+        this.len = len;
+    }
+
+    /**
+     * @return RPC handler.
+     */
+    public HadoopIgfsEx hadoop() {
+        return hadoop;
+    }
+
+    /**
+     * @return Stream target.
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T target() {
+        return (T) target;
+    }
+
+    /**
+     * @return Length.
+     */
+    public long length() {
+        return len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return System.identityHashCode(target);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return obj != null && obj instanceof HadoopIgfsStreamDelegate &&
+            target == ((HadoopIgfsStreamDelegate)obj).target;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopIgfsStreamDelegate.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsStreamEventListener.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsStreamEventListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsStreamEventListener.java
new file mode 100644
index 0000000..ee46ed4
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsStreamEventListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * IGFS input stream event listener.
+ */
+public interface HadoopIgfsStreamEventListener {
+    /**
+     * Callback invoked when the stream is being closed.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onClose() throws IgniteCheckedException;
+
+    /**
+     * Callback invoked when remote error occurs.
+     *
+     * @param errMsg Error message.
+     */
+    public void onError(String errMsg);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsUtils.java
new file mode 100644
index 0000000..ee7756e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsUtils.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
+import org.apache.ignite.igfs.IgfsParentNotDirectoryException;
+import org.apache.ignite.igfs.IgfsPathAlreadyExistsException;
+import org.apache.ignite.igfs.IgfsPathNotFoundException;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+/**
+ * Utility constants and methods for IGFS Hadoop file system.
+ */
+public class HadoopIgfsUtils {
+    /** Parameter name for endpoint no embed mode flag. */
+    public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed";
+
+    /** Parameter name for endpoint no shared memory flag. */
+    public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem";
+
+    /** Parameter name for endpoint no local TCP flag. */
+    public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp";
+
+    /**
+     * Get string parameter.
+     *
+     * @param cfg Configuration.
+     * @param name Parameter name.
+     * @param authority Authority.
+     * @param dflt Default value.
+     * @return String value.
+     */
+    public static String parameter(Configuration cfg, String name, String authority, String dflt) {
+        return cfg.get(String.format(name, authority != null ? authority : ""), dflt);
+    }
+
+    /**
+     * Get integer parameter.
+     *
+     * @param cfg Configuration.
+     * @param name Parameter name.
+     * @param authority Authority.
+     * @param dflt Default value.
+     * @return Integer value.
+     * @throws IOException In case of parse exception.
+     */
+    public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException {
+        String name0 = String.format(name, authority != null ? authority : "");
+
+        try {
+            return cfg.getInt(name0, dflt);
+        }
+        catch (NumberFormatException ignore) {
+            throw new IOException("Failed to parse parameter value to integer: " + name0);
+        }
+    }
+
+    /**
+     * Get boolean parameter.
+     *
+     * @param cfg Configuration.
+     * @param name Parameter name.
+     * @param authority Authority.
+     * @param dflt Default value.
+     * @return Boolean value.
+     */
+    public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) {
+        return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt);
+    }
+
+    /**
+     * Cast Ignite exception to appropriate IO exception.
+     *
+     * @param e Exception to cast.
+     * @return Casted exception.
+     */
+    public static IOException cast(IgniteCheckedException e) {
+        return cast(e, null);
+    }
+
+    /**
+     * Cast Ignite exception to appropriate IO exception.
+     *
+     * @param e Exception to cast.
+     * @param path Path for exceptions.
+     * @return Casted exception.
+     */
+    @SuppressWarnings("unchecked")
+    public static IOException cast(IgniteCheckedException e, @Nullable String path) {
+        assert e != null;
+
+        // First check for any nested IOException; if exists - re-throw it.
+        if (e.hasCause(IOException.class))
+            return e.getCause(IOException.class);
+        else if (e.hasCause(IgfsPathNotFoundException.class))
+            return new FileNotFoundException(path); // TODO: Or PathNotFoundException?
+        else if (e.hasCause(IgfsParentNotDirectoryException.class))
+            return new ParentNotDirectoryException(path);
+        else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class))
+            return new PathIsNotEmptyDirectoryException(path);
+        else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class))
+            return new PathExistsException(path);
+        else {
+            String msg = e.getMessage();
+
+            return msg == null ? new IOException(e) : new IOException(msg, e);
+        }
+    }
+
+    /**
+     * Deletes all files from the given file system.
+     *
+     * @param fs The file system to clean up.
+     * @throws IOException On error.
+     */
+    public static void clear(FileSystem fs) throws IOException {
+        // Delete root contents:
+        FileStatus[] statuses = fs.listStatus(new Path("/"));
+
+        if (statuses != null) {
+            for (FileStatus stat: statuses)
+                fs.delete(stat.getPath(), true);
+        }
+    }
+
+    /**
+     * Deletes all files from the given file system.
+     *
+     * @param fs The file system to clean up.
+     * @throws IOException On error.
+     */
+    public static void clear(AbstractFileSystem fs) throws IOException {
+        // Delete root contents:
+        FileStatus[] statuses = fs.listStatus(new Path("/"));
+
+        if (statuses != null) {
+            for (FileStatus stat: statuses)
+                fs.delete(stat.getPath(), true);
+        }
+    }
+
+    /**
+     * Constructor.
+     */
+    private HadoopIgfsUtils() {
+        // No-op.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java
new file mode 100644
index 0000000..1fda1c3
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsWrapper.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathSummary;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
+import org.apache.ignite.internal.processors.igfs.IgfsStatus;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.IgniteState.STARTED;
+import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint.LOCALHOST;
+import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED;
+import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM;
+import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP;
+import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.parameter;
+
+/**
+ * Wrapper for IGFS server.
+ */
+public class HadoopIgfsWrapper implements HadoopIgfs {
+    /** Delegate. */
+    private final AtomicReference<Delegate> delegateRef = new AtomicReference<>();
+
+    /** Authority. */
+    private final String authority;
+
+    /** Connection string. */
+    private final HadoopIgfsEndpoint endpoint;
+
+    /** Log directory. */
+    private final String logDir;
+
+    /** Configuration. */
+    private final Configuration conf;
+
+    /** Logger. */
+    private final Log log;
+
+    /** The user name this wrapper works on behalf of. */
+    private final String userName;
+
+    /**
+     * Constructor.
+     *
+     * @param authority Authority (connection string).
+     * @param logDir Log directory for server.
+     * @param conf Configuration.
+     * @param log Current logger.
+     */
+    public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user)
+        throws IOException {
+        try {
+            this.authority = authority;
+            this.endpoint = new HadoopIgfsEndpoint(authority);
+            this.logDir = logDir;
+            this.conf = conf;
+            this.log = log;
+            this.userName = user;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException("Failed to parse endpoint: " + authority, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() {
+            @Override public IgfsHandshakeResponse apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) {
+                return hndResp;
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(boolean force) {
+        Delegate delegate = delegateRef.get();
+
+        if (delegate != null && delegateRef.compareAndSet(delegate, null))
+            delegate.close(force);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile info(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
+            @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.info(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
+            @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.update(path, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime)
+        throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.setTimes(path, accessTime, modificationTime);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.rename(src, dest);
+            }
+        }, src);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.delete(path, recursive);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start,
+        final long len) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() {
+            @Override public Collection<IgfsBlockLocation> apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.affinity(path, start, len);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() {
+            @Override public IgfsPathSummary apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.contentSummary(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Boolean>() {
+            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.mkdirs(path, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() {
+            @Override public Collection<IgfsFile> apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.listFiles(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() {
+            @Override public Collection<IgfsPath> apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.listPaths(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsStatus fsStatus() throws IOException {
+        return withReconnectHandling(new FileSystemClosure<IgfsStatus>() {
+            @Override public IgfsStatus apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
+                throws IgniteCheckedException, IOException {
+                return hadoop.fsStatus();
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.open(path);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
+        throws IOException {
+        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.open(path, seqReadsBeforePrefetch);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite,
+        final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props)
+        throws IOException {
+        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.create(path, overwrite, colocate, replication, blockSize, props);
+            }
+        }, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
+        @Nullable final Map<String, String> props) throws IOException {
+        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
+            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
+                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
+                return hadoop.append(path, create, props);
+            }
+        }, path);
+    }
+
+    /**
+     * Execute closure which is not path-specific.
+     *
+     * @param clo Closure.
+     * @return Result.
+     * @throws IOException If failed.
+     */
+    private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException {
+        return withReconnectHandling(clo, null);
+    }
+
+    /**
+     * Execute closure.
+     *
+     * @param clo Closure.
+     * @param path Path for exceptions.
+     * @return Result.
+     * @throws IOException If failed.
+     */
+    private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path)
+        throws IOException {
+        Exception err = null;
+
+        for (int i = 0; i < 2; i++) {
+            Delegate curDelegate = null;
+
+            boolean close = false;
+            boolean force = false;
+
+            try {
+                curDelegate = delegate();
+
+                assert curDelegate != null;
+
+                close = curDelegate.doomed;
+
+                return clo.apply(curDelegate.hadoop, curDelegate.hndResp);
+            }
+            catch (HadoopIgfsCommunicationException e) {
+                if (curDelegate != null && !curDelegate.doomed) {
+                    // Try getting rid fo faulty delegate ASAP.
+                    delegateRef.compareAndSet(curDelegate, null);
+
+                    close = true;
+                    force = true;
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send message to a server: " + e);
+
+                err = e;
+            }
+            catch (IgniteCheckedException e) {
+                throw HadoopIgfsUtils.cast(e, path != null ? path.toString() : null);
+            }
+            finally {
+                if (close) {
+                    assert curDelegate != null;
+
+                    curDelegate.close(force);
+                }
+            }
+        }
+
+        List<Throwable> list = X.getThrowableList(err);
+
+        Throwable cause = list.get(list.size() - 1);
+
+        throw new IOException("Failed to communicate with IGFS: "
+            + (cause.getMessage() == null ? cause.toString() : cause.getMessage()), err);
+    }
+
+    /**
+     * Get delegate creating it if needed.
+     *
+     * @return Delegate.
+     */
+    private Delegate delegate() throws HadoopIgfsCommunicationException {
+        // These fields will contain possible exceptions from shmem and TCP endpoints.
+        Exception errShmem = null;
+        Exception errTcp = null;
+
+        // 1. If delegate is set, return it immediately.
+        Delegate curDelegate = delegateRef.get();
+
+        if (curDelegate != null)
+            return curDelegate;
+
+        // 2. Guess that we are in the same VM.
+        boolean skipInProc = parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false);
+
+        if (!skipInProc) {
+            IgfsEx igfs = getIgfsEx(endpoint.grid(), endpoint.igfs());
+
+            if (igfs != null) {
+                HadoopIgfsEx hadoop = null;
+
+                try {
+                    hadoop = new HadoopIgfsInProc(igfs, log, userName);
+
+                    curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    if (e instanceof HadoopIgfsCommunicationException)
+                        if (hadoop != null)
+                            hadoop.close(true);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to connect to in-process IGFS, fallback to IPC mode.", e);
+                }
+            }
+        }
+
+        // 3. Try connecting using shmem.
+        boolean skipLocShmem = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false);
+
+        if (curDelegate == null && !skipLocShmem && !U.isWindows()) {
+            HadoopIgfsEx hadoop = null;
+
+            try {
+                hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName);
+
+                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+            }
+            catch (IOException | IgniteCheckedException e) {
+                if (e instanceof HadoopIgfsCommunicationException)
+                    hadoop.close(true);
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to connect to IGFS using shared memory [port=" + endpoint.port() + ']', e);
+
+                errShmem = e;
+            }
+        }
+
+        // 4. Try local TCP connection.
+        boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false);
+
+        if (curDelegate == null && !skipLocTcp) {
+            HadoopIgfsEx hadoop = null;
+
+            try {
+                hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(),
+                    log, userName);
+
+                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+            }
+            catch (IOException | IgniteCheckedException e) {
+                if (e instanceof HadoopIgfsCommunicationException)
+                    hadoop.close(true);
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() +
+                        ", port=" + endpoint.port() + ']', e);
+
+                errTcp = e;
+            }
+        }
+
+        // 5. Try remote TCP connection.
+        if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) {
+            HadoopIgfsEx hadoop = null;
+
+            try {
+                hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(),
+                    log, userName);
+
+                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
+            }
+            catch (IOException | IgniteCheckedException e) {
+                if (e instanceof HadoopIgfsCommunicationException)
+                    hadoop.close(true);
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() +
+                        ", port=" + endpoint.port() + ']', e);
+
+                errTcp = e;
+            }
+        }
+
+        if (curDelegate != null) {
+            if (!delegateRef.compareAndSet(null, curDelegate))
+                curDelegate.doomed = true;
+
+            return curDelegate;
+        }
+        else {
+            SB errMsg = new SB("Failed to connect to IGFS [endpoint=igfs://" + authority + ", attempts=[");
+
+            if (errShmem != null)
+                errMsg.a("[type=SHMEM, port=" + endpoint.port() + ", err=" + errShmem + "], ");
+
+            errMsg.a("[type=TCP, host=" + endpoint.host() + ", port=" + endpoint.port() + ", err=" + errTcp + "]] ");
+
+            errMsg.a("(ensure that IGFS is running and have IPC endpoint enabled; ensure that " +
+                "ignite-shmem-1.0.0.jar is in Hadoop classpath if you use shared memory endpoint).");
+
+            throw new HadoopIgfsCommunicationException(errMsg.toString());
+        }
+    }
+
+    /**
+     * File system operation closure.
+     */
+    private static interface FileSystemClosure<T> {
+        /**
+         * Call closure body.
+         *
+         * @param hadoop RPC handler.
+         * @param hndResp Handshake response.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         * @throws IOException If failed.
+         */
+        public T apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException;
+    }
+
+    /**
+     * Delegate.
+     */
+    private static class Delegate {
+        /** RPC handler. */
+        private final HadoopIgfsEx hadoop;
+
+        /** Handshake request. */
+        private final IgfsHandshakeResponse hndResp;
+
+        /** Close guard. */
+        private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+        /** Whether this delegate must be closed at the end of the next invocation. */
+        private boolean doomed;
+
+        /**
+         * Constructor.
+         *
+         * @param hadoop Hadoop.
+         * @param hndResp Handshake response.
+         */
+        private Delegate(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) {
+            this.hadoop = hadoop;
+            this.hndResp = hndResp;
+        }
+
+        /**
+         * Close underlying RPC handler.
+         *
+         * @param force Force flag.
+         */
+        private void close(boolean force) {
+            if (closeGuard.compareAndSet(false, true))
+                hadoop.close(force);
+        }
+    }
+
+    /**
+     * Helper method to find Igfs of the given name in the given Ignite instance.
+     *
+     * @param gridName The name of the grid to check.
+     * @param igfsName The name of Igfs.
+     * @return The file system instance, or null if not found.
+     */
+    private static IgfsEx getIgfsEx(@Nullable String gridName, @Nullable String igfsName) {
+        if (Ignition.state(gridName) == STARTED) {
+            try {
+                for (IgniteFileSystem fs : Ignition.ignite(gridName).fileSystems()) {
+                    if (F.eq(fs.name(), igfsName))
+                        return (IgfsEx)fs;
+                }
+            }
+            catch (IgniteIllegalStateException ignore) {
+                // May happen if the grid state has changed:
+            }
+        }
+
+        return null;
+    }
+}
\ No newline at end of file


Mime
View raw message