ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] incubator-ignite git commit: [IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.
Date Fri, 29 May 2015 12:32:12 GMT
[IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/35388195
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/35388195
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/35388195

Branch: refs/heads/ignite-sprint-5
Commit: 353881951fcdcc16c3dc31d808d3af6c263f74ce
Parents: 7ec4c82
Author: iveselovskiy <iveselovskiy@gridgain.com>
Authored: Fri May 29 15:31:35 2015 +0300
Committer: iveselovskiy <iveselovskiy@gridgain.com>
Committed: Fri May 29 15:31:35 2015 +0300

----------------------------------------------------------------------
 .../igfs/secondary/IgfsSecondaryFileSystem.java |   7 +
 .../internal/igfs/common/IgfsMarshaller.java    |  35 +---
 .../igfs/common/IgfsPathControlRequest.java     |  22 +++
 .../internal/processors/hadoop/HadoopJob.java   |   2 +-
 .../ignite/internal/processors/igfs/IgfsEx.java |   8 +-
 .../internal/processors/igfs/IgfsImpl.java      |   8 +-
 .../processors/igfs/IgfsIpcHandler.java         | 184 ++++++++++---------
 .../igfs/IgfsSecondaryFileSystemImpl.java       |   9 +-
 .../internal/processors/igfs/IgfsServer.java    |   4 +-
 .../internal/processors/igfs/IgfsUtils.java     |  16 ++
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 165 ++++++++++++-----
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    | 107 +++++++----
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |  32 +++-
 .../internal/processors/hadoop/HadoopUtils.java |  10 +-
 .../hadoop/SecondaryFileSystemProvider.java     |  53 +++---
 .../hadoop/fs/HadoopDistributedFileSystem.java  |  91 ---------
 .../hadoop/fs/HadoopFileSystemsUtils.java       |  17 --
 .../processors/hadoop/igfs/HadoopIgfsEx.java    |   6 +
 .../hadoop/igfs/HadoopIgfsInProc.java           | 170 ++++++++++++-----
 .../processors/hadoop/igfs/HadoopIgfsIpcIo.java |   2 +-
 .../hadoop/igfs/HadoopIgfsOutProc.java          |  33 +++-
 .../hadoop/igfs/HadoopIgfsWrapper.java          |  19 +-
 .../hadoop/v2/HadoopV2TaskContext.java          |   4 +-
 .../HadoopIgfs20FileSystemAbstractSelfTest.java |  56 ++++--
 ...oopSecondaryFileSystemConfigurationTest.java |   4 +-
 .../IgniteHadoopFileSystemAbstractSelfTest.java |  63 +++++--
 .../IgniteHadoopFileSystemClientSelfTest.java   |   2 +-
 .../IgniteHadoopFileSystemIpcCacheSelfTest.java |   2 +
 .../hadoop/HadoopFileSystemsTest.java           |  23 +--
 .../collections/HadoopSkipListSelfTest.java     |   4 +-
 30 files changed, 684 insertions(+), 474 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
index 9026eac..cb69352 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
@@ -198,4 +198,11 @@ public interface IgfsSecondaryFileSystem {
      * @return Map of properties.
      */
     public Map<String,String> properties();
+
+
+    /**
+     * Closes the secondary file system.
+     * @throws IgniteException in case of an error.
+     */
+    public void close() throws IgniteException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
index 11af716..6a6f22a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
@@ -73,6 +73,7 @@ public class IgfsMarshaller {
     }
 
     /**
+     * Serializes the message and sends it into the given output stream.
      * @param msg Message.
      * @param hdr Message header.
      * @param out Output.
@@ -119,6 +120,7 @@ public class IgfsMarshaller {
 
                     IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
 
+                    U.writeString(out, req.userName());
                     writePath(out, req.path());
                     writePath(out, req.destinationPath());
                     out.writeBoolean(req.flag());
@@ -236,6 +238,7 @@ public class IgfsMarshaller {
                 case OPEN_CREATE: {
                     IgfsPathControlRequest req = new IgfsPathControlRequest();
 
+                    req.userName(U.readString(in));
                     req.path(readPath(in));
                     req.destinationPath(readPath(in));
                     req.flag(in.readBoolean());
@@ -298,8 +301,6 @@ public class IgfsMarshaller {
                 }
             }
 
-            assert msg != null;
-
             msg.command(cmd);
 
             return msg;
@@ -341,34 +342,4 @@ public class IgfsMarshaller {
 
         return null;
     }
-
-    /**
-     * Writes string to output.
-     *
-     * @param out Data output.
-     * @param str String.
-     * @throws IOException If write failed.
-     */
-    private void writeString(DataOutput out, @Nullable String str) throws IOException {
-        out.writeBoolean(str != null);
-
-        if (str != null)
-            out.writeUTF(str);
-    }
-
-    /**
-     * Reads string from input.
-     *
-     * @param in Data input.
-     * @return Read string.
-     * @throws IOException If read failed.
-     */
-    @Nullable private String readString(DataInput in) throws IOException {
-        boolean hasStr = in.readBoolean();
-
-        if (hasStr)
-            return in.readUTF();
-
-        return null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
index 7ed1619..2f6e6e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.igfs.common;
 
 import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
@@ -63,6 +64,9 @@ public class IgfsPathControlRequest extends IgfsMessage {
     /** Last modification time. */
     private long modificationTime;
 
+    /** The user name this control request is made on behalf of. */
+    private String userName;
+
     /**
      * @param path Path.
      */
@@ -235,4 +239,22 @@ public class IgfsPathControlRequest extends IgfsMessage {
     @Override public String toString() {
         return S.toString(IgfsPathControlRequest.class, this, "cmd", command());
     }
+
+    /**
+     * Getter for the user name.
+     * @return user name.
+     */
+    public final String userName() {
+        assert userName != null;
+
+        return userName;
+    }
+
+    /**
+     * Setter for the user name.
+     * @param userName the user name.
+     */
+    public final void userName(String userName) {
+        this.userName = IgfsUtils.fixUserName(userName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
index 65cb48d..5fd6c81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
@@ -98,5 +98,5 @@ public interface HadoopJob {
     /**
      * Cleans up the job staging directory.
      */
-    void cleanupStagingDirectory();
+    public void cleanupStagingDirectory();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index 7c1a837..361f75f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -48,8 +48,12 @@ public interface IgfsEx extends IgniteFileSystem {
     /** Property name for URI of file system. */
     public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
 
-    /** Property name for user name of file system. */
-    public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";
+    /** Property name for default user name of file system.
+     * NOTE: for secondary file system this is just a default user name, which is used
+     * when the 2ndary filesystem is used outside of any user context.
+     * If another user name is set in the context, 2ndary file system will work on behalf
+     * of that user, which is different from the default. */
+     public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";
 
     /**
      * Stops IGFS cleaning all used resources.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 34636d2..c3495e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -245,8 +245,12 @@ public final class IgfsImpl implements IgfsEx {
             for (IgfsFileWorkerBatch batch : workerMap.values())
                 batch.cancel();
 
-            if (secondaryFs instanceof AutoCloseable)
-                U.closeQuiet((AutoCloseable)secondaryFs);
+            try {
+                secondaryFs.close();
+            }
+            catch (Exception e) {
+                log.error("Failed to close secondary file system.", e);
+            }
         }
 
         igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index 8a8b858..cfe6ed4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -51,10 +51,10 @@ class IgfsIpcHandler implements IgfsServerHandler {
     private final int bufSize; // Buffer size. Must not be less then file block size.
 
     /** IGFS instance for this handler. */
-    private IgfsEx igfs;
+    private final IgfsEx igfs;
 
     /** Resource ID generator. */
-    private AtomicLong rsrcIdGen = new AtomicLong();
+    private final AtomicLong rsrcIdGen = new AtomicLong();
 
     /** Stopping flag. */
     private volatile boolean stopping;
@@ -241,138 +241,148 @@ class IgfsIpcHandler implements IgfsServerHandler {
      * @return Response message.
      * @throws IgniteCheckedException If failed.
      */
-    private IgfsMessage processPathControlRequest(IgfsClientSession ses, IgfsIpcCommand cmd,
+    private IgfsMessage processPathControlRequest(final IgfsClientSession ses, final IgfsIpcCommand cmd,
         IgfsMessage msg) throws IgniteCheckedException {
-        IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
+        final IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
 
         if (log.isDebugEnabled())
             log.debug("Processing path control request [igfsName=" + igfs.name() + ", req=" + req + ']');
 
-        IgfsControlResponse res = new IgfsControlResponse();
+        final IgfsControlResponse res = new IgfsControlResponse();
+
+        final String userName = req.userName();
+
+        assert userName != null;
 
         try {
-            switch (cmd) {
-                case EXISTS:
-                    res.response(igfs.exists(req.path()));
+            IgfsUserContext.doAs(userName, new IgniteOutClosure<Object>() {
+                @Override public Void apply() {
+                    switch (cmd) {
+                        case EXISTS:
+                            res.response(igfs.exists(req.path()));
 
-                    break;
+                            break;
 
-                case INFO:
-                    res.response(igfs.info(req.path()));
+                        case INFO:
+                            res.response(igfs.info(req.path()));
 
-                    break;
+                            break;
 
-                case PATH_SUMMARY:
-                    res.response(igfs.summary(req.path()));
+                        case PATH_SUMMARY:
+                            res.response(igfs.summary(req.path()));
 
-                    break;
+                            break;
 
-                case UPDATE:
-                    res.response(igfs.update(req.path(), req.properties()));
+                        case UPDATE:
+                            res.response(igfs.update(req.path(), req.properties()));
 
-                    break;
+                            break;
 
-                case RENAME:
-                    igfs.rename(req.path(), req.destinationPath());
+                        case RENAME:
+                            igfs.rename(req.path(), req.destinationPath());
 
-                    res.response(true);
+                            res.response(true);
 
-                    break;
+                            break;
 
-                case DELETE:
-                    res.response(igfs.delete(req.path(), req.flag()));
+                        case DELETE:
+                            res.response(igfs.delete(req.path(), req.flag()));
 
-                    break;
+                            break;
 
-                case MAKE_DIRECTORIES:
-                    igfs.mkdirs(req.path(), req.properties());
+                        case MAKE_DIRECTORIES:
+                            igfs.mkdirs(req.path(), req.properties());
 
-                    res.response(true);
+                            res.response(true);
 
-                    break;
+                            break;
 
-                case LIST_PATHS:
-                    res.paths(igfs.listPaths(req.path()));
+                        case LIST_PATHS:
+                            res.paths(igfs.listPaths(req.path()));
 
-                    break;
+                            break;
 
-                case LIST_FILES:
-                    res.files(igfs.listFiles(req.path()));
+                        case LIST_FILES:
+                            res.files(igfs.listFiles(req.path()));
 
-                    break;
+                            break;
 
-                case SET_TIMES:
-                    igfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
+                        case SET_TIMES:
+                            igfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
 
-                    res.response(true);
+                            res.response(true);
 
-                    break;
+                            break;
 
-                case AFFINITY:
-                    res.locations(igfs.affinity(req.path(), req.start(), req.length()));
+                        case AFFINITY:
+                            res.locations(igfs.affinity(req.path(), req.start(), req.length()));
 
-                    break;
+                            break;
 
-                case OPEN_READ: {
-                    IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
-                        igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
+                        case OPEN_READ: {
+                            IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
+                                igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
 
-                    long streamId = registerResource(ses, igfsIn);
+                            long streamId = registerResource(ses, igfsIn);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" +
+                                    req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
 
-                    IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null,
-                        igfsIn.fileInfo().modificationTime());
+                            IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null,
+                                igfsIn.fileInfo().modificationTime());
 
-                    res.response(new IgfsInputStreamDescriptor(streamId, info.length()));
+                            res.response(new IgfsInputStreamDescriptor(streamId, info.length()));
 
-                    break;
-                }
+                            break;
+                        }
 
-                case OPEN_CREATE: {
-                    long streamId = registerResource(ses, igfs.create(
-                        req.path(),       // Path.
-                        bufSize,          // Buffer size.
-                        req.flag(),       // Overwrite if exists.
-                        affinityKey(req), // Affinity key based on replication factor.
-                        req.replication(),// Replication factor.
-                        req.blockSize(),  // Block size.
-                        req.properties()  // File properties.
-                    ));
+                        case OPEN_CREATE: {
+                            long streamId = registerResource(ses, igfs.create(
+                                req.path(),       // Path.
+                                bufSize,          // Buffer size.
+                                req.flag(),       // Overwrite if exists.
+                                affinityKey(req), // Affinity key based on replication factor.
+                                req.replication(),// Replication factor.
+                                req.blockSize(),  // Block size.
+                                req.properties()  // File properties.
+                            ));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" +
+                                    req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
 
-                    res.response(streamId);
+                            res.response(streamId);
 
-                    break;
-                }
+                            break;
+                        }
 
-                case OPEN_APPEND: {
-                    long streamId = registerResource(ses, igfs.append(
-                        req.path(),        // Path.
-                        bufSize,           // Buffer size.
-                        req.flag(),        // Create if absent.
-                        req.properties()   // File properties.
-                    ));
+                        case OPEN_APPEND: {
+                            long streamId = registerResource(ses, igfs.append(
+                                req.path(),        // Path.
+                                bufSize,           // Buffer size.
+                                req.flag(),        // Create if absent.
+                                req.properties()   // File properties.
+                            ));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" +
+                                    req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
 
-                    res.response(streamId);
+                            res.response(streamId);
 
-                    break;
-                }
+                            break;
+                        }
 
-                default:
-                    assert false : "Unhandled path control request command: " + cmd;
+                        default:
+                            assert false : "Unhandled path control request command: " + cmd;
 
-                    break;
-            }
+                            break;
+                    }
+
+                    return null;
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index 683b317..b8095b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -30,14 +30,14 @@ import java.util.*;
  */
 class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
     /** Delegate. */
-    private final IgfsImpl igfs;
+    private final IgfsEx igfs;
 
     /**
      * Constructor.
      *
      * @param igfs Delegate.
      */
-    IgfsSecondaryFileSystemImpl(IgfsImpl igfs) {
+    IgfsSecondaryFileSystemImpl(IgfsEx igfs) {
         this.igfs = igfs;
     }
 
@@ -118,4 +118,9 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
     @Override public Map<String, String> properties() {
         return Collections.emptyMap();
     }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IgniteException {
+        igfs.stop(true);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
index 253d5be..caa6866 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
@@ -239,13 +239,13 @@ public class IgfsServer {
      */
     private class ClientWorker extends GridWorker {
         /** Connected client endpoint. */
-        private IpcEndpoint endpoint;
+        private final IpcEndpoint endpoint;
 
         /** Data output stream. */
         private final IgfsDataOutputStream out;
 
         /** Client session object. */
-        private IgfsClientSession ses;
+        private final IgfsClientSession ses;
 
         /** Queue node for fast unlink. */
         private ConcurrentLinkedDeque8.Node<ClientWorker> node;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 4b0234f..8026a44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
 
 import java.lang.reflect.*;
 
@@ -88,4 +90,18 @@ public class IgfsUtils {
     private IgfsUtils() {
         // No-op.
     }
+
+    /**
+     * Provides non-null user name.
+     * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME},
+     * which is the current process owner user.
+     * @param user a user name to be fixed.
+     * @return non-null interned user name.
+     */
+    public static String fixUserName(@Nullable String user) {
+        if (F.isEmpty(user))
+           user = FileSystemConfiguration.DFLT_USER_NAME;
+
+        return user;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index ba891f8..6a630fb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -20,15 +20,16 @@ package org.apache.ignite.hadoop.fs;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.igfs.secondary.*;
 import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.igfs.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.jetbrains.annotations.*;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap.*;
 
 import java.io.*;
 import java.net.*;
@@ -37,15 +38,45 @@ import java.util.*;
 import static org.apache.ignite.internal.processors.igfs.IgfsEx.*;
 
 /**
- * Adapter to use any Hadoop file system {@link FileSystem} as  {@link IgfsSecondaryFileSystem}.
+ * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}.
+ * In fact, this class deals with different FileSystems depending on the user context,
+ * see {@link IgfsUserContext#currentUser()}.
  */
-public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, AutoCloseable {
-    /** Hadoop file system. */
-    private final FileSystem fileSys;
-
-    /** Properties of file system */
+public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem {
+    /** Properties of file system, see {@link #properties()}
+     *
+     * See {@link IgfsEx#SECONDARY_FS_CONFIG_PATH}
+     * See {@link IgfsEx#SECONDARY_FS_URI}
+     * See {@link IgfsEx#SECONDARY_FS_USER_NAME}
+     * */
     private final Map<String, String> props = new HashMap<>();
 
+    /** Secondary file system provider. */
+    private final SecondaryFileSystemProvider secProvider;
+
+    /** The default user name. It is used if no user context is set. */
+    private final String dfltUserName;
+
+    /** FileSystem instance created for the default user.
+     * Stored outside the fileSysLazyMap due to performance reasons. */
+    private final FileSystem dfltFs;
+
+    /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
+    private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+        new ValueFactory<String, FileSystem>() {
+            @Override public FileSystem createValue(String key) {
+                try {
+                    assert !F.isEmpty(key);
+
+                    return secProvider.createFileSystem(key);
+                }
+                catch (IOException ioe) {
+                    throw new IgniteException(ioe);
+                }
+            }
+        }
+    );
+
     /**
      * Simple constructor that is to be used by default.
      *
@@ -77,8 +108,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
      * @throws IgniteCheckedException In case of error.
      */
     public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath,
-        @Nullable String userName)
-            throws IgniteCheckedException {
+        @Nullable String userName) throws IgniteCheckedException {
         // Treat empty uri and userName arguments as nulls to improve configuration usability:
         if (F.isEmpty(uri))
             uri = null;
@@ -89,27 +119,31 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
         if (F.isEmpty(userName))
             userName = null;
 
+        this.dfltUserName = IgfsUtils.fixUserName(userName);
+
         try {
-            SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(uri, cfgPath, userName);
+            this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath);
 
-            fileSys = secProvider.createFileSystem();
+            // File system creation for the default user name.
+            // The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field:
+            this.dfltFs = secProvider.createFileSystem(dfltUserName);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
 
-            uri = secProvider.uri().toString();
+        assert dfltFs != null;
 
-            if (!uri.endsWith("/"))
-                uri += "/";
+        uri = secProvider.uri().toString();
 
-            if (cfgPath != null)
-                props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
+        if (!uri.endsWith("/"))
+            uri += "/";
 
-            if (userName != null)
-                props.put(SECONDARY_FS_USER_NAME, userName);
+        if (cfgPath != null)
+            props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
 
-            props.put(SECONDARY_FS_URI, uri);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
+        props.put(SECONDARY_FS_URI, uri);
+        props.put(SECONDARY_FS_USER_NAME, dfltUserName);
     }
 
     /**
@@ -119,7 +153,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
      * @return Hadoop path.
      */
     private Path convert(IgfsPath path) {
-        URI uri = fileSys.getUri();
+        URI uri = fileSysForUser().getUri();
 
         return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
     }
@@ -131,14 +165,9 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
      * @param detailMsg Detailed error message.
      * @return Appropriate exception.
      */
-    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
     private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
-        boolean wrongVer = X.hasCause(e, RemoteException.class) ||
-            (e.getMessage() != null && e.getMessage().contains("Failed on local"));
-
-        return !wrongVer ? cast(detailMsg, e) :
-            new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " +
-                "version.", e);    }
+        return cast(detailMsg, e);
+    }
 
     /**
      * Cast IO exception to IGFS exception.
@@ -178,7 +207,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public boolean exists(IgfsPath path) {
         try {
-            return fileSys.exists(convert(path));
+            return fileSysForUser().exists(convert(path));
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
@@ -189,6 +218,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
         HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
 
+        final FileSystem fileSys = fileSysForUser();
+
         try {
             if (props0.userName() != null || props0.groupName() != null)
                 fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
@@ -208,7 +239,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     @Override public void rename(IgfsPath src, IgfsPath dest) {
         // Delegate to the secondary file system.
         try {
-            if (!fileSys.rename(convert(src), convert(dest)))
+            if (!fileSysForUser().rename(convert(src), convert(dest)))
                 throw new IgfsException("Failed to rename (secondary file system returned false) " +
                     "[src=" + src + ", dest=" + dest + ']');
         }
@@ -220,7 +251,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public boolean delete(IgfsPath path, boolean recursive) {
         try {
-            return fileSys.delete(convert(path), recursive);
+            return fileSysForUser().delete(convert(path), recursive);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
@@ -230,7 +261,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public void mkdirs(IgfsPath path) {
         try {
-            if (!fileSys.mkdirs(convert(path)))
+            if (!fileSysForUser().mkdirs(convert(path)))
                 throw new IgniteException("Failed to make directories [path=" + path + "]");
         }
         catch (IOException e) {
@@ -241,7 +272,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
         try {
-            if (!fileSys.mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
+            if (!fileSysForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
                 throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
         }
         catch (IOException e) {
@@ -252,7 +283,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
         try {
-            FileStatus[] statuses = fileSys.listStatus(convert(path));
+            FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
 
             if (statuses == null)
                 throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
@@ -275,7 +306,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
         try {
-            FileStatus[] statuses = fileSys.listStatus(convert(path));
+            FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
 
             if (statuses == null)
                 throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
@@ -302,13 +333,13 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
 
     /** {@inheritDoc} */
     @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) {
-        return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSys, convert(path), bufSize);
+        return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSysForUser(), convert(path), bufSize);
     }
 
     /** {@inheritDoc} */
     @Override public OutputStream create(IgfsPath path, boolean overwrite) {
         try {
-            return fileSys.create(convert(path), overwrite);
+            return fileSysForUser().create(convert(path), overwrite);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
@@ -322,8 +353,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
             new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap());
 
         try {
-            return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize,
-                null);
+            return fileSysForUser().create(convert(path), props0.permission(), overwrite, bufSize,
+                (short)replication, blockSize, null);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
@@ -336,7 +367,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
         @Nullable Map<String, String> props) {
         try {
-            return fileSys.append(convert(path), bufSize);
+            return fileSysForUser().append(convert(path), bufSize);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
@@ -346,7 +377,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public IgfsFile info(final IgfsPath path) {
         try {
-            final FileStatus status = fileSys.getFileStatus(convert(path));
+            final FileStatus status = fileSysForUser().getFileStatus(convert(path));
 
             if (status == null)
                 return null;
@@ -421,7 +452,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
         try {
             // We don't use FileSystem#getUsed() since it counts only the files
             // in the filesystem root, not all the files recursively.
-            return fileSys.getContentSummary(new Path("/")).getSpaceConsumed();
+            return fileSysForUser().getContentSummary(new Path("/")).getSpaceConsumed();
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
@@ -429,25 +460,57 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Map<String, String> properties() {
+    @Override public Map<String, String> properties() {
         return props;
     }
 
     /** {@inheritDoc} */
-    @Override public void close() throws IgniteCheckedException {
+    @Override public void close() throws IgniteException {
+        Exception e = null;
+
         try {
-            fileSys.close();
+            dfltFs.close();
         }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
+        catch (Exception e0) {
+            e = e0;
+        }
+
+        try {
+            fileSysLazyMap.close();
+        }
+        catch (IgniteCheckedException ice) {
+            if (e == null)
+                e = ice;
         }
+
+        if (e != null)
+            throw new IgniteException(e);
     }
 
     /**
      * Gets the underlying {@link FileSystem}.
+     * This method is used solely for testing.
      * @return the underlying Hadoop {@link FileSystem}.
      */
     public FileSystem fileSystem() {
-        return fileSys;
+        return fileSysForUser();
+    }
+
+    /**
+     * Gets the FileSystem for the current context user.
+     * @return the FileSystem instance, never null.
+     */
+    private FileSystem fileSysForUser() {
+        String user = IgfsUserContext.currentUser();
+
+        if (F.isEmpty(user))
+            user = dfltUserName; // default is never empty.
+
+        assert !F.isEmpty(user);
+
+        if (F.eq(user, dfltUserName))
+            return dfltFs; // optimization
+
+        return fileSysLazyMap.getOrCreate(user);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 1f53a06..c0a9ade 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.hdfs.*;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
@@ -97,21 +98,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
     /** Grid remote client. */
     private HadoopIgfsWrapper rmtClient;
 
-    /** User name for each thread. */
-    private final ThreadLocal<String> userName = new ThreadLocal<String>(){
-        /** {@inheritDoc} */
-        @Override protected String initialValue() {
-            return DFLT_USER_NAME;
-        }
-    };
-
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>(){
-        /** {@inheritDoc} */
-        @Override protected Path initialValue() {
-            return getHomeDirectory();
-        }
-    };
+    /** working directory. */
+    private Path workingDir;
 
     /** Default replication factor. */
     private short dfltReplication;
@@ -129,6 +117,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
     /** Secondary URI string. */
     private URI secondaryUri;
 
+    /** The user name this file system was created on behalf of. */
+    private String user;
+
     /** IGFS mode resolver. */
     private IgfsModeResolver modeRslvr;
 
@@ -182,6 +173,36 @@ public class IgniteHadoopFileSystem extends FileSystem {
     }
 
     /**
+     * Gets non-null and interned user name as per the Hadoop file system viewpoint.
+     * @return the user name, never null.
+     */
+    public static String getFsHadoopUser(Configuration cfg) throws IOException {
+        String user = null;
+
+        // -------------------------------------------
+        // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761
+        // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect
+        // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct
+        // ugi.doAs() closure.
+        if (cfg != null)
+            user = cfg.get(MRJobConfig.USER_NAME);
+        // -------------------------------------------
+
+        if (user == null) {
+            UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
+
+            if (currUgi != null)
+                user = currUgi.getShortUserName();
+        }
+
+        user = IgfsUtils.fixUserName(user);
+
+        assert user != null;
+
+        return user;
+    }
+
+    /**
      * Public setter that can be used by direct users of FS or Visor.
      *
      * @param colocateFileWrites Whether all ongoing file writes should be colocated.
@@ -221,7 +242,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             uriAuthority = uri.getAuthority();
 
-            setUser(cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+            user = getFsHadoopUser(cfg);
 
             // Override sequential reads before prefetch if needed.
             seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
@@ -244,7 +265,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
 
-            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG);
+            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user);
 
             // Handshake.
             IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
@@ -289,13 +310,12 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
                 String secUri = props.get(SECONDARY_FS_URI);
                 String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
-                String secUserName = props.get(SECONDARY_FS_USER_NAME);
 
                 try {
-                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath,
-                        secUserName);
+                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
+
+                    secondaryFs = secProvider.createFileSystem(user);
 
-                    secondaryFs = secProvider.createFileSystem();
                     secondaryUri = secProvider.uri();
                 }
                 catch (IOException e) {
@@ -306,6 +326,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
                             "will have no effect): " + e.getMessage());
                 }
             }
+
+            // set working directory to the home directory of the current Fs user:
+            setWorkingDirectory(null);
         }
         finally {
             leaveBusy();
@@ -849,22 +872,11 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
     /** {@inheritDoc} */
     @Override public Path getHomeDirectory() {
-        Path path = new Path("/user/" + userName.get());
+        Path path = new Path("/user/" + user);
 
         return path.makeQualified(getUri(), null);
     }
 
-    /**
-     * Set user name and default working directory for current thread.
-     *
-     * @param userName User name.
-     */
-    public void setUser(String userName) {
-        this.userName.set(userName);
-
-        setWorkingDirectory(null);
-    }
-
     /** {@inheritDoc} */
     @Override public void setWorkingDirectory(Path newPath) {
         if (newPath == null) {
@@ -873,7 +885,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
             if (secondaryFs != null)
                 secondaryFs.setWorkingDirectory(toSecondary(homeDir));
 
-            workingDir.set(homeDir);
+            workingDir = homeDir;
         }
         else {
             Path fixedNewPath = fixRelativePart(newPath);
@@ -886,13 +898,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
             if (secondaryFs != null)
                 secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath));
 
-            workingDir.set(fixedNewPath);
+            workingDir = fixedNewPath;
         }
     }
 
     /** {@inheritDoc} */
     @Override public Path getWorkingDirectory() {
-        return workingDir.get();
+        return workingDir;
     }
 
     /** {@inheritDoc} */
@@ -1153,7 +1165,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
             return null;
 
         return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) :
-            new IgfsPath(convert(workingDir.get()), path.toUri().getPath());
+            new IgfsPath(convert(workingDir), path.toUri().getPath());
     }
 
     /**
@@ -1191,9 +1203,16 @@ public class IgniteHadoopFileSystem extends FileSystem {
      */
     @SuppressWarnings("deprecation")
     private FileStatus convert(IgfsFile file) {
-        return new FileStatus(file.length(), file.isDirectory(), getDefaultReplication(),
-            file.groupBlockSize(), file.modificationTime(), file.accessTime(), permission(file),
-            file.property(PROP_USER_NAME, DFLT_USER_NAME), file.property(PROP_GROUP_NAME, "users"),
+        return new FileStatus(
+            file.length(),
+            file.isDirectory(),
+            getDefaultReplication(),
+            file.groupBlockSize(),
+            file.modificationTime(),
+            file.accessTime(),
+            permission(file),
+            file.property(PROP_USER_NAME, user),
+            file.property(PROP_GROUP_NAME, "users"),
             convert(file.path())) {
             @Override public String toString() {
                 return "FileStatus [path=" + getPath() + ", isDir=" + isDir() + ", len=" + getLen() +
@@ -1247,4 +1266,12 @@ public class IgniteHadoopFileSystem extends FileSystem {
     @Override public String toString() {
         return S.toString(IgniteHadoopFileSystem.class, this);
     }
+
+    /**
+     * Returns the user name this File System is created on behalf of.
+     * @return the user name
+     */
+    public String user() {
+        return user;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index 9cfb79b..f3fbe9c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
@@ -40,6 +39,7 @@ import java.util.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.configuration.FileSystemConfiguration.*;
+import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.*;
 import static org.apache.ignite.igfs.IgfsMode.*;
 import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
 import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.*;
@@ -91,11 +91,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
     /** Grid remote client. */
     private HadoopIgfsWrapper rmtClient;
 
+    /** The name of the user this File System created on behalf of. */
+    private final String user;
+
     /** Working directory. */
     private IgfsPath workingDir;
 
     /** URI. */
-    private URI uri;
+    private final URI uri;
 
     /** Authority. */
     private String uriAuthority;
@@ -141,6 +144,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
         uri = name;
 
+        user = getFsHadoopUser(cfg);
+
         try {
             initialize(name, cfg);
         }
@@ -152,7 +157,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             throw e;
         }
 
-        workingDir = new IgfsPath("/user/" + cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+        workingDir = new IgfsPath("/user/" + user);
     }
 
     /** {@inheritDoc} */
@@ -240,7 +245,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
             String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
 
-            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG);
+            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user);
 
             // Handshake.
             IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
@@ -284,13 +289,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
                 String secUri = props.get(SECONDARY_FS_URI);
                 String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
-                String secUserName = props.get(SECONDARY_FS_USER_NAME);
 
                 try {
-                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath,
-                        secUserName);
+                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
+
+                    secondaryFs = secProvider.createAbstractFileSystem(user);
 
-                    secondaryFs = secProvider.createAbstractFileSystem();
                     secondaryUri = secProvider.uri();
                 }
                 catch (IOException e) {
@@ -929,7 +933,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             file.modificationTime(),
             file.accessTime(),
             permission(file),
-            file.property(PROP_USER_NAME, DFLT_USER_NAME),
+            file.property(PROP_USER_NAME, user),
             file.property(PROP_GROUP_NAME, "users"),
             convert(file.path())) {
             @Override public String toString() {
@@ -983,4 +987,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
     @Override public String toString() {
         return S.toString(IgniteHadoopFileSystem.class, this);
     }
-}
+
+    /**
+     * Returns the user name this File System is created on behalf of.
+     * @return the user name
+     */
+    public String user() {
+        return user;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 00be422..d493bd4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -126,11 +126,15 @@ public class HadoopUtils {
                 break;
 
             case PHASE_REDUCE:
-                assert status.totalReducerCnt() > 0;
-
+                // TODO: temporary fixed, but why PHASE_REDUCE could have 0 reducers?
+                // See https://issues.apache.org/jira/browse/IGNITE-764
                 setupProgress = 1;
                 mapProgress = 1;
-                reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
+
+                if (status.totalReducerCnt() > 0)
+                    reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
+                else
+                    reduceProgress = 1f;
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index 27805f8..b1a057c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -19,12 +19,15 @@ package org.apache.ignite.internal.processors.hadoop;
 
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.security.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.net.*;
+import java.security.*;
 
 /**
  * Encapsulates logic of secondary filesystem creation.
@@ -36,9 +39,6 @@ public class SecondaryFileSystemProvider {
     /** The secondary filesystem URI, never null. */
     private final URI uri;
 
-    /** Optional user name to log into secondary filesystem with. */
-    private @Nullable final String userName;
-
     /**
      * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be
      * specified either explicitly or in the configuration provided.
@@ -47,13 +47,10 @@ public class SecondaryFileSystemProvider {
      * property in the provided configuration.
      * @param secConfPath the secondary Fs path (file path on the local file system, optional).
      * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved.
-     * @param userName User name.
      * @throws IOException
      */
     public SecondaryFileSystemProvider(final @Nullable String secUri,
-        final @Nullable String secConfPath, @Nullable String userName) throws IOException {
-        this.userName = userName;
-
+        final @Nullable String secConfPath) throws IOException {
         if (secConfPath != null) {
             URL url = U.resolveIgniteUrl(secConfPath);
 
@@ -88,20 +85,18 @@ public class SecondaryFileSystemProvider {
      * @return {@link org.apache.hadoop.fs.FileSystem}  instance for this secondary Fs.
      * @throws IOException
      */
-    public FileSystem createFileSystem() throws IOException {
+    public FileSystem createFileSystem(String userName) throws IOException {
+        userName = IgfsUtils.fixUserName(userName);
+
         final FileSystem fileSys;
 
-        if (userName == null)
-            fileSys = FileSystem.get(uri, cfg);
-        else {
-            try {
-                fileSys = FileSystem.get(uri, cfg, userName);
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
+        try {
+           fileSys = FileSystem.get(uri, cfg, userName);
+        }
+        catch (InterruptedException e) {
+           Thread.currentThread().interrupt();
 
-                throw new IOException("Failed to create file system due to interrupt.", e);
-            }
+           throw new IOException("Failed to create file system due to interrupt.", e);
         }
 
         return fileSys;
@@ -109,10 +104,26 @@ public class SecondaryFileSystemProvider {
 
     /**
      * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs.
-     * @throws IOException
+     * @throws IOException in case of error.
      */
-    public AbstractFileSystem createAbstractFileSystem() throws IOException {
-        return AbstractFileSystem.get(uri, cfg);
+    public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException {
+        userName = IgfsUtils.fixUserName(userName);
+
+        String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
+
+        UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName);
+
+        try {
+            return ugi.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() {
+                @Override public AbstractFileSystem run() throws IOException {
+                    return AbstractFileSystem.get(uri, cfg);
+                }
+            });
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+
+            throw new IOException("Failed to create file system due to interrupt.", ie);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
deleted file mode 100644
index 509f443..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.ignite.configuration.FileSystemConfiguration.*;
-
-/**
- * Wrapper of HDFS for support of separated working directory.
- */
-public class HadoopDistributedFileSystem extends DistributedFileSystem {
-    /** User name for each thread. */
-    private final ThreadLocal<String> userName = new ThreadLocal<String>() {
-        /** {@inheritDoc} */
-        @Override protected String initialValue() {
-            return DFLT_USER_NAME;
-        }
-    };
-
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
-        /** {@inheritDoc} */
-        @Override protected Path initialValue() {
-            return getHomeDirectory();
-        }
-    };
-
-    /** {@inheritDoc} */
-    @Override public void initialize(URI uri, Configuration conf) throws IOException {
-        super.initialize(uri, conf);
-
-        setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
-    }
-
-    /**
-     * Set user name and default working directory for current thread.
-     *
-     * @param userName User name.
-     */
-    public void setUser(String userName) {
-        this.userName.set(userName);
-
-        setWorkingDirectory(getHomeDirectory());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getHomeDirectory() {
-        Path path = new Path("/user/" + userName.get());
-
-        return path.makeQualified(getUri(), null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setWorkingDirectory(Path dir) {
-        Path fixedDir = fixRelativePart(dir);
-
-        String res = fixedDir.toUri().getPath();
-
-        if (!DFSUtil.isValidName(res))
-            throw new IllegalArgumentException("Invalid DFS directory name " + res);
-
-        workingDir.set(fixedDir);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getWorkingDirectory() {
-        return workingDir.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
index f3f51d4..d90bc28 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
@@ -19,8 +19,6 @@ package org.apache.ignite.internal.processors.hadoop.fs;
 
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.ignite.hadoop.fs.v1.*;
 
 /**
  * Utilities for configuring file systems to support the separate working directory per each thread.
@@ -30,19 +28,6 @@ public class HadoopFileSystemsUtils {
     public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
 
     /**
-     * Set user name and default working directory for current thread if it's supported by file system.
-     *
-     * @param fs File system.
-     * @param userName User name.
-     */
-    public static void setUser(FileSystem fs, String userName) {
-        if (fs instanceof IgniteHadoopFileSystem)
-            ((IgniteHadoopFileSystem)fs).setUser(userName);
-        else if (fs instanceof HadoopDistributedFileSystem)
-            ((HadoopDistributedFileSystem)fs).setUser(userName);
-    }
-
-    /**
      * Setup wrappers of filesystems to support the separate working directory.
      *
      * @param cfg Config for setup.
@@ -51,7 +36,5 @@ public class HadoopFileSystemsUtils {
         cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName());
         cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
                 HadoopLocalFileSystemV2.class.getName());
-
-        cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
index 2f19226..b9c5113 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
@@ -85,4 +85,10 @@ public interface HadoopIgfsEx extends HadoopIgfs {
      * @throws IOException If failed.
      */
     public void flush(HadoopIgfsStreamDelegate delegate) throws IOException;
+
+    /**
+     * The user this Igfs instance works on behalf of.
+     * @return the user name.
+     */
+    public String user();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
index 44e531e..47ba0e8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
@@ -23,6 +23,7 @@ import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -46,25 +47,35 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** Logger. */
     private final Log log;
 
+    /** The user this Igfs works on behalf of. */
+    private final String user;
+
     /**
      * Constructor.
      *
      * @param igfs Target IGFS.
      * @param log Log.
      */
-    public HadoopIgfsInProc(IgfsEx igfs, Log log) {
+    public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException {
+        this.user = IgfsUtils.fixUserName(userName);
+
         this.igfs = igfs;
+
         this.log = log;
 
         bufSize = igfs.configuration().getBlockSize() * 2;
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsHandshakeResponse handshake(String logDir) {
-        igfs.clientLogDirectory(logDir);
+    @Override public IgfsHandshakeResponse handshake(final String logDir) {
+        return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsHandshakeResponse>() {
+            @Override public IgfsHandshakeResponse apply() {
+                igfs.clientLogDirectory(logDir);
 
-        return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
-            igfs.globalSampling());
+                return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
+                    igfs.globalSampling());
+                }
+         });
     }
 
     /** {@inheritDoc} */
@@ -82,9 +93,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
+    @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException {
         try {
-            return igfs.info(path);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
+                @Override public IgfsFile apply() {
+                    return igfs.info(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -95,9 +110,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+    @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
         try {
-            return igfs.update(path, props);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
+                @Override public IgfsFile apply() {
+                    return igfs.update(path, props);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -108,9 +127,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException {
+    @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException {
         try {
-            igfs.setTimes(path, accessTime, modificationTime);
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.setTimes(path, accessTime, modificationTime);
+
+                    return null;
+                }
+            });
 
             return true;
         }
@@ -124,9 +149,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
+    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException {
         try {
-            igfs.rename(src, dest);
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.rename(src, dest);
+
+                    return null;
+                }
+            });
 
             return true;
         }
@@ -139,9 +170,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
+    @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException {
         try {
-            return igfs.delete(path, recursive);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() {
+                @Override public Boolean apply() {
+                    return igfs.delete(path, recursive);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -154,18 +189,32 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** {@inheritDoc} */
     @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
         try {
-            return igfs.globalSpace();
+            return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() {
+                @Override public IgfsStatus call() throws IgniteCheckedException {
+                    return igfs.globalSpace();
+                }
+            });
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " +
                 "stopping.");
         }
+        catch (IgniteCheckedException | RuntimeException | Error e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new AssertionError("Must never go there.");
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
+    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IgniteCheckedException {
         try {
-            return igfs.listPaths(path);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsPath>>() {
+                @Override public Collection<IgfsPath> apply() {
+                    return igfs.listPaths(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -176,9 +225,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
+    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IgniteCheckedException {
         try {
-            return igfs.listFiles(path);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsFile>>() {
+                @Override public Collection<IgfsFile> apply() {
+                    return igfs.listFiles(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -189,9 +242,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
         try {
-            igfs.mkdirs(path, props);
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.mkdirs(path, props);
+
+                    return null;
+                }
+            });
 
             return true;
         }
@@ -205,9 +264,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
+    @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException {
         try {
-            return igfs.summary(path);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsPathSummary>() {
+                @Override public IgfsPathSummary apply() {
+                    return igfs.summary(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -219,10 +282,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
+    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, final long len)
         throws IgniteCheckedException {
         try {
-            return igfs.affinity(path, start, len);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsBlockLocation>>() {
+                @Override public Collection<IgfsBlockLocation> apply() {
+                    return igfs.affinity(path, start, len);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -233,11 +300,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException {
         try {
-            IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
 
-            return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length());
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length());
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -248,12 +319,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch)
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
         throws IgniteCheckedException {
         try {
-            IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
 
-            return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length());
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length());
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -264,13 +339,17 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
-        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate,
+        final int replication, final long blockSize, final @Nullable Map<String, String> props) throws IgniteCheckedException {
         try {
-            IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
-                colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
+                        colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
 
-            return new HadoopIgfsStreamDelegate(this, stream);
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -281,12 +360,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
-        @Nullable Map<String, String> props) throws IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
+        final @Nullable Map<String, String> props) throws IgniteCheckedException {
         try {
-            IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
 
-            return new HadoopIgfsStreamDelegate(this, stream);
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -407,4 +490,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
         if (lsnr0 != null && log.isDebugEnabled())
             log.debug("Removed stream event listener [delegate=" + delegate + ']');
     }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return user;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
index 0264e7b..3561e95 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
@@ -41,7 +41,7 @@ import java.util.concurrent.locks.*;
 @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
 public class HadoopIgfsIpcIo implements HadoopIgfsIo {
     /** Logger. */
-    private Log log;
+    private final Log log;
 
     /** Request futures map. */
     private ConcurrentMap<Long, HadoopIgfsFuture> reqMap =


Mime
View raw message