ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject incubator-ignite git commit: [IGNITE-218]: Wrong staging permissions while running MR job under hadoop accelerator
Date Thu, 04 Jun 2015 15:22:14 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-sprint-5 20e567773 -> c9f729178


[IGNITE-218]: Wrong staging permissions while running MR job under hadoop accelerator


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c9f72917
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c9f72917
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c9f72917

Branch: refs/heads/ignite-sprint-5
Commit: c9f72917843092d596044197cf7cb05c56a13fca
Parents: 20e5677
Author: iveselovskiy <iveselovskiy@gridgain.com>
Authored: Thu Jun 4 18:20:24 2015 +0300
Committer: iveselovskiy <iveselovskiy@gridgain.com>
Committed: Thu Jun 4 18:20:24 2015 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopTaskContext.java    |  14 +-
 .../igfs/IgfsSecondaryFileSystemImpl.java       |   2 +-
 .../fs/IgniteHadoopFileSystemCounterWriter.java |  14 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  70 ++---
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |   2 +-
 .../processors/hadoop/HadoopDefaultJobInfo.java |   2 +-
 .../internal/processors/hadoop/HadoopUtils.java | 282 ++++++++++++++++++-
 .../hadoop/SecondaryFileSystemProvider.java     |   4 +-
 .../hadoop/taskexecutor/HadoopRunnableTask.java |  20 +-
 .../processors/hadoop/v2/HadoopV2Job.java       |  31 +-
 .../hadoop/v2/HadoopV2JobResourceManager.java   |  26 +-
 .../hadoop/v2/HadoopV2TaskContext.java          |  48 +++-
 .../hadoop/HadoopClientProtocolSelfTest.java    |   6 +-
 .../hadoop/HadoopAbstractSelfTest.java          |  14 +-
 .../hadoop/HadoopCommandLineTest.java           |  14 +-
 .../processors/hadoop/HadoopMapReduceTest.java  | 176 +++++++++++-
 .../hadoop/HadoopTaskExecutionSelfTest.java     |   2 +-
 .../hadoop/HadoopTasksAllVersionsTest.java      |  15 +-
 .../processors/hadoop/HadoopTasksV1Test.java    |   5 +-
 .../processors/hadoop/HadoopTasksV2Test.java    |   5 +-
 .../processors/hadoop/HadoopV2JobSelfTest.java  |   6 +-
 .../collections/HadoopAbstractMapTest.java      |  12 +
 22 files changed, 643 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index 371fd81..3d2ee17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -21,13 +21,14 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 
 import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Task context.
  */
 public abstract class HadoopTaskContext {
     /** */
-    private final HadoopJob job;
+    protected final HadoopJob job;
 
     /** */
     private HadoopTaskInput input;
@@ -187,4 +188,15 @@ public abstract class HadoopTaskContext {
      * @throws IgniteCheckedException If failed.
      */
     public abstract void cleanupTaskEnvironment() throws IgniteCheckedException;
+
+    /**
+     * Executes a callable on behalf of the job owner.
+     * In case of embedded task execution the implementation of this method
+     * will use classes loaded by the ClassLoader this HadoopTaskContext loaded with.
+     * @param c The callable.
+     * @param <T> The return type of the Callable.
+     * @return The result of the callable.
+     * @throws IgniteCheckedException On any error in callable.
+     */
+    public abstract <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index b8095b8..44ee90f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -121,6 +121,6 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
 
     /** {@inheritDoc} */
     @Override public void close() throws IgniteException {
-        igfs.stop(true);
+        // No-op.
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 66e9761..d910507 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -20,10 +20,12 @@ package org.apache.ignite.hadoop.fs;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.*;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 
 import java.io.*;
@@ -37,9 +39,6 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
     public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance";
 
     /** */
-    private static final String DEFAULT_USER_NAME = "anonymous";
-
-    /** */
     public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory";
 
     /** */
@@ -52,15 +51,14 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
     @Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs)
         throws IgniteCheckedException {
 
-        Configuration hadoopCfg = new Configuration();
+        Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
 
         for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
             hadoopCfg.set(e.getKey(), e.getValue());
 
         String user = jobInfo.user();
 
-        if (F.isEmpty(user))
-            user = DEFAULT_USER_NAME;
+        user = IgfsUtils.fixUserName(user);
 
         String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY);
 
@@ -72,7 +70,9 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
         HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
 
         try {
-            FileSystem fs = jobStatPath.getFileSystem(hadoopCfg);
+            hadoopCfg.set(MRJobConfig.USER_NAME, user);
+
+            FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, true);
 
             fs.mkdirs(jobStatPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index c0a9ade..9d94e5b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.security.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
@@ -144,9 +143,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
     /** Custom-provided sequential reads before prefetch. */
     private int seqReadsBeforePrefetch;
 
-    /** The cache was disabled when the instance was creating. */
-    private boolean cacheEnabled;
-
     /** {@inheritDoc} */
     @Override public URI getUri() {
         if (uri == null)
@@ -173,27 +169,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
     }
 
     /**
-     * Gets non-null and interned user name as per the Hadoop file system viewpoint.
+     * Gets non-null user name as per the Hadoop file system viewpoint.
      * @return the user name, never null.
      */
-    public static String getFsHadoopUser(Configuration cfg) throws IOException {
-        String user = null;
-
-        // -------------------------------------------
-        // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761
-        // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect
-        // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct
-        // ugi.doAs() closure.
-        if (cfg != null)
-            user = cfg.get(MRJobConfig.USER_NAME);
-        // -------------------------------------------
-
-        if (user == null) {
-            UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
-
-            if (currUgi != null)
-                user = currUgi.getShortUserName();
-        }
+    public static String getFsHadoopUser() throws IOException {
+        UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
+
+        String user = currUgi.getShortUserName();
 
         user = IgfsUtils.fixUserName(user);
 
@@ -228,10 +210,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             setConf(cfg);
 
-            String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme());
-
-            cacheEnabled = !cfg.getBoolean(disableCacheName, false);
-
             mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false);
 
             if (!IGFS_SCHEME.equals(name.getScheme()))
@@ -242,7 +220,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             uriAuthority = uri.getAuthority();
 
-            user = getFsHadoopUser(cfg);
+            user = getFsHadoopUser();
 
             // Override sequential reads before prefetch if needed.
             seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
@@ -360,15 +338,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
     @Override protected void finalize() throws Throwable {
         super.finalize();
 
-        close0();
+        close();
     }
 
     /** {@inheritDoc} */
     @Override public void close() throws IOException {
-        if (cacheEnabled && get(getUri(), getConf()) == this)
-            return;
-
-        close0();
+        if (closeGuard.compareAndSet(false, true))
+            close0();
     }
 
     /**
@@ -377,27 +353,25 @@ public class IgniteHadoopFileSystem extends FileSystem {
      * @throws IOException If failed.
      */
     private void close0() throws IOException {
-        if (closeGuard.compareAndSet(false, true)) {
-            if (LOG.isDebugEnabled())
-                LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
+        if (LOG.isDebugEnabled())
+            LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']');
 
-            if (rmtClient == null)
-                return;
+        if (rmtClient == null)
+            return;
 
-            super.close();
+        super.close();
 
-            rmtClient.close(false);
+        rmtClient.close(false);
 
-            if (clientLog.isLogEnabled())
-                clientLog.close();
+        if (clientLog.isLogEnabled())
+            clientLog.close();
 
-            if (secondaryFs != null)
-                U.closeQuiet(secondaryFs);
+        if (secondaryFs != null)
+            U.closeQuiet(secondaryFs);
 
-            // Reset initialized resources.
-            uri = null;
-            rmtClient = null;
-        }
+        // Reset initialized resources.
+        uri = null;
+        rmtClient = null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index f3fbe9c..8330143 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -144,7 +144,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
         uri = name;
 
-        user = getFsHadoopUser(cfg);
+        user = getFsHadoopUser();
 
         try {
             initialize(name, cfg);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index d0a327e..2e855d0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
             if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes.
                 synchronized (HadoopDefaultJobInfo.class) {
                     if ((jobCls0 = jobCls) == null) {
-                        HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-main");
+                        HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job");
 
                         jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName());
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index d493bd4..68a9ef6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -26,10 +26,16 @@ import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
+import java.net.*;
 import java.util.*;
 
 /**
@@ -57,6 +63,41 @@ public class HadoopUtils {
     /** Old reducer class attribute. */
     private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
 
+    /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
+    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+        new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
+            @Override public FileSystem createValue(FsCacheKey key) {
+                try {
+                    assert key != null;
+
+                    // Explicitly disable FileSystem caching:
+                    URI uri = key.uri();
+
+                    String scheme = uri.getScheme();
+
+                    // Copy the configuration to avoid altering the external object.
+                    Configuration cfg = new Configuration(key.configuration());
+
+                    String prop = HadoopUtils.disableFsCachePropertyName(scheme);
+
+                    cfg.setBoolean(prop, true);
+
+                    return FileSystem.get(uri, cfg, key.user());
+                }
+                catch (IOException | InterruptedException ioe) {
+                    throw new IgniteException(ioe);
+                }
+            }
+        }
+    );
+
+    /**
+     * Constructor.
+     */
+    private HadoopUtils() {
+        // No-op.
+    }
+
     /**
      * Wraps native split.
      *
@@ -126,8 +167,6 @@ public class HadoopUtils {
                 break;
 
             case PHASE_REDUCE:
-                // TODO: temporary fixed, but why PHASE_REDUCE could have 0 reducers?
-                // See https://issues.apache.org/jira/browse/IGNITE-764
                 setupProgress = 1;
                 mapProgress = 1;
 
@@ -304,9 +343,242 @@ public class HadoopUtils {
     }
 
     /**
-     * Constructor.
+     * Creates {@link Configuration} in a correct class loader context to avoid caching
+     * of inappropriate class loader in the Configuration object.
+     * @return New instance of {@link Configuration}.
      */
-    private HadoopUtils() {
-        // No-op.
+    public static Configuration safeCreateConfiguration() {
+        final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+        Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader());
+
+        try {
+            return new Configuration();
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(cl0);
+        }
+    }
+
+    /**
+     * Creates {@link JobConf} in a correct class loader context to avoid caching
+     * of inappropriate class loader in the Configuration object.
+     * @return New instance of {@link JobConf}.
+     */
+    public static JobConf safeCreateJobConf() {
+        final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+        Thread.currentThread().setContextClassLoader(JobConf.class.getClassLoader());
+
+        try {
+            return new JobConf();
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(cl0);
+        }
+    }
+
+    /**
+     * Gets non-null user name as per the Hadoop viewpoint.
+     * @param cfg the Hadoop job configuration, may be null.
+     * @return the user name, never null.
+     */
+    private static String getMrHadoopUser(Configuration cfg) throws IOException {
+        String user = cfg.get(MRJobConfig.USER_NAME);
+
+        if (user == null)
+            user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+        return user;
+    }
+
+    /**
+     * Common method to get the V1 file system in MapRed engine.
+     * It creates the filesystem for the user specified in the
+     * configuration with {@link MRJobConfig#USER_NAME} property.
+     * @param uri the file system uri.
+     * @param cfg the configuration.
+     * @return the file system
+     * @throws IOException
+     */
+    public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, boolean doCacheFs) throws IOException {
+        final String usr = getMrHadoopUser(cfg);
+
+        assert usr != null;
+
+        if (uri == null)
+            uri = FileSystem.getDefaultUri(cfg);
+
+        final FileSystem fs;
+
+        if (doCacheFs) {
+            try {
+                fs = getWithCaching(uri, cfg, usr);
+            }
+            catch (IgniteException ie) {
+                throw new IOException(ie);
+            }
+        }
+        else {
+            try {
+                fs = FileSystem.get(uri, cfg, usr);
+            }
+            catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+
+                throw new IOException(ie);
+            }
+        }
+
+        assert fs != null;
+        assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
+
+        return fs;
+    }
+
+    /**
+     * Note that configuration is not a part of the key.
+     * It is used solely to initialize the first instance
+     * that is created for the key.
+     */
+    public static final class FsCacheKey {
+        /** */
+        private final URI uri;
+
+        /** */
+        private final String usr;
+
+        /** */
+        private final String equalityKey;
+
+        /** */
+        private final Configuration cfg;
+
+        /**
+         * Constructor
+         */
+        public FsCacheKey(URI uri, String usr, Configuration cfg) {
+            assert uri != null;
+            assert usr != null;
+            assert cfg != null;
+
+            this.uri = fixUri(uri, cfg);
+            this.usr = usr;
+            this.cfg = cfg;
+
+            this.equalityKey = createEqualityKey();
+        }
+
+        /**
+         * Creates String key used for equality and hashing.
+         */
+        private String createEqualityKey() {
+            GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
+
+            if (uri.getScheme() != null)
+                sb.a(uri.getScheme().toLowerCase());
+
+            sb.a("://");
+
+            if (uri.getAuthority() != null)
+                sb.a(uri.getAuthority().toLowerCase());
+
+            return sb.toString();
+        }
+
+        /**
+         * The URI.
+         */
+        public URI uri() {
+            return uri;
+        }
+
+        /**
+         * The User.
+         */
+        public String user() {
+            return usr;
+        }
+
+        /**
+         * The Configuration.
+         */
+        public Configuration configuration() {
+            return cfg;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("SimplifiableIfStatement")
+        @Override public boolean equals(Object obj) {
+            if (obj == this)
+                return true;
+
+            if (obj == null || getClass() != obj.getClass())
+                return false;
+
+            return equalityKey.equals(((FsCacheKey)obj).equalityKey);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return equalityKey.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return equalityKey;
+        }
+    }
+
+    /**
+     * Gets FileSystem caching it in static Ignite cache. The cache is a singleton
+     * for each class loader.
+     *
+     * <p/>Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}.
+     * The Configuration is not a part of the key. This means that for the given key file system is
+     * initialized only once with the Configuration passed in upon the file system creation.
+     *
+     * @param uri The file system URI.
+     * @param cfg The configuration.
+     * @param usr The user to create file system for.
+     * @return The file system: either created, or taken from the cache.
+     */
+    private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) {
+        FsCacheKey key = new FsCacheKey(uri, usr, cfg);
+
+        return fileSysLazyMap.getOrCreate(key);
+    }
+
+    /**
+     * Gets the property name to disable file system cache.
+     * @param scheme The file system URI scheme.
+     * @return The property name. If scheme is null,
+     * returns "fs.null.impl.disable.cache".
+     */
+    public static String disableFsCachePropertyName(@Nullable String scheme) {
+        return String.format("fs.%s.impl.disable.cache", scheme);
+    }
+
+    /**
+     * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
+     * @param uri0 The uri.
+     * @param cfg The cfg.
+     * @return Correct URI.
+     */
+    public static URI fixUri(URI uri0, Configuration cfg) {
+        if (uri0 == null)
+            return FileSystem.getDefaultUri(cfg);
+
+        String scheme = uri0.getScheme();
+        String authority = uri0.getAuthority();
+
+        if (authority == null) {
+            URI dfltUri = FileSystem.getDefaultUri(cfg);
+
+            if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null))
+                return dfltUri;
+        }
+
+        return uri0;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index b1a057c..dd679de 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -34,7 +34,7 @@ import java.security.*;
  */
 public class SecondaryFileSystemProvider {
     /** Configuration of the secondary filesystem, never null. */
-    private final Configuration cfg = new Configuration();
+    private final Configuration cfg = HadoopUtils.safeCreateConfiguration();
 
     /** The secondary filesystem URI, never null. */
     private final URI uri;
@@ -76,7 +76,7 @@ public class SecondaryFileSystemProvider {
         }
 
         // Disable caching:
-        String prop = String.format("fs.%s.impl.disable.cache", uri.getScheme());
+        String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme());
 
         cfg.setBoolean(prop, true);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index 2e04ac1..b170125 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -99,6 +99,22 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
 
     /** {@inheritDoc} */
     @Override public Void call() throws IgniteCheckedException {
+        ctx = job.getTaskContext(info);
+
+        return ctx.runAsJobOwner(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                call0();
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Implements actual task running.
+     * @throws IgniteCheckedException
+     */
+    void call0() throws IgniteCheckedException {
         execStartTs = U.currentTimeMillis();
 
         Throwable err = null;
@@ -108,8 +124,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
         HadoopPerformanceCounter perfCntr = null;
 
         try {
-            ctx = job.getTaskContext(info);
-
             perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
 
             perfCntr.onTaskSubmit(info, submitTs);
@@ -156,8 +170,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
             if (ctx != null)
                 ctx.cleanupTaskEnvironment();
         }
-
-        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index d265ca8..d754039 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.hadoop.v2;
 
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapred.JobID;
@@ -68,7 +67,7 @@ public class HadoopV2Job implements HadoopJob {
         new ConcurrentHashMap8<>();
 
     /** Pooling task context class and thus class loading environment. */
-    private final Queue<Class<?>> taskCtxClsPool = new ConcurrentLinkedQueue<>();
+    private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = new ConcurrentLinkedQueue<>();
 
     /** All created contexts. */
     private final Queue<Class<?>> fullCtxClsQueue = new ConcurrentLinkedDeque<>();
@@ -93,12 +92,7 @@ public class HadoopV2Job implements HadoopJob {
 
         hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
 
-        HadoopClassLoader clsLdr = (HadoopClassLoader)getClass().getClassLoader();
-
-        // Before create JobConf instance we should set new context class loader.
-        Thread.currentThread().setContextClassLoader(clsLdr);
-
-        jobConf = new JobConf();
+        jobConf = HadoopUtils.safeCreateJobConf();
 
         HadoopFileSystemsUtils.setupFileSystems(jobConf);
 
@@ -139,7 +133,9 @@ public class HadoopV2Job implements HadoopJob {
 
             Path jobDir = new Path(jobDirPath);
 
-            try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) {
+            try {
+                FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, true);
+
                 JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
                     jobDir);
 
@@ -197,7 +193,7 @@ public class HadoopV2Job implements HadoopJob {
         if (old != null)
             return old.get();
 
-        Class<?> cls = taskCtxClsPool.poll();
+        Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll();
 
         try {
             if (cls == null) {
@@ -205,9 +201,9 @@ public class HadoopV2Job implements HadoopJob {
                 // Note that the classloader identified by the task it was initially created for,
                 // but later it may be reused for other tasks.
                 HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(),
-                    "hadoop-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber());
+                    "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber());
 
-                cls = ldr.loadClass(HadoopV2TaskContext.class.getName());
+                cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
 
                 fullCtxClsQueue.add(cls);
             }
@@ -325,7 +321,14 @@ public class HadoopV2Job implements HadoopJob {
 
     /** {@inheritDoc} */
     @Override public void cleanupStagingDirectory() {
-        if (rsrcMgr != null)
-            rsrcMgr.cleanupStagingDirectory();
+        rsrcMgr.cleanupStagingDirectory();
+    }
+
+    /**
+     * Getter for job configuration.
+     * @return The job configuration.
+     */
+    public JobConf jobConf() {
+        return jobConf;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 6f6bfa1..2f64e77 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -40,6 +40,9 @@ import java.util.*;
  * files are needed to be placed on local files system.
  */
 public class HadoopV2JobResourceManager {
+    /** File type Fs disable caching property name. */
+    private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = HadoopUtils.disableFsCachePropertyName("file");
+
     /** Hadoop job context. */
     private final JobContextImpl ctx;
 
@@ -84,7 +87,7 @@ public class HadoopV2JobResourceManager {
         try {
             cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath());
 
-            if(!cfg.getBoolean("fs.file.impl.disable.cache", false))
+            if (!cfg.getBoolean(FILE_DISABLE_CACHING_PROPERTY_NAME, false))
                 FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath()));
         }
         finally {
@@ -112,15 +115,17 @@ public class HadoopV2JobResourceManager {
                 stagingDir = new Path(new URI(mrDir));
 
                 if (download) {
-                    FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg);
+                    FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, true);
 
                     if (!fs.exists(stagingDir))
-                        throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " +
-                            stagingDir);
+                        throw new IgniteCheckedException("Failed to find map-reduce submission " +
+                            "directory (does not exist): " + stagingDir);
 
                     if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
-                        throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " +
-                            "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']');
+                        throw new IgniteCheckedException("Failed to copy job submission directory "
+                            + "contents to local file system "
+                            + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath()
+                            + ", jobId=" + jobId + ']');
                 }
 
                 File jarJobFile = new File(jobLocDir, "job.jar");
@@ -144,7 +149,8 @@ public class HadoopV2JobResourceManager {
                 }
             }
             else if (!jobLocDir.mkdirs())
-                throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath());
+                throw new IgniteCheckedException("Failed to create local job directory: "
+                    + jobLocDir.getAbsolutePath());
 
             setLocalFSWorkingDirectory(jobLocDir);
         }
@@ -204,14 +210,14 @@ public class HadoopV2JobResourceManager {
 
             FileSystem dstFs = FileSystem.getLocal(cfg);
 
-            FileSystem srcFs = srcPath.getFileSystem(cfg);
+            FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, true);
 
             if (extract) {
                 File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
 
                 if (!archivesPath.exists() && !archivesPath.mkdir())
                     throw new IOException("Failed to create directory " +
-                         "[path=" + archivesPath + ", jobId=" + jobId + ']');
+                        "[path=" + archivesPath + ", jobId=" + jobId + ']');
 
                 File archiveFile = new File(archivesPath, locName);
 
@@ -287,7 +293,7 @@ public class HadoopV2JobResourceManager {
     public void cleanupStagingDirectory() {
         try {
             if (stagingDir != null)
-                stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, true);
+                HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), true).delete(stagingDir, true);
         }
         catch (Exception e) {
             log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index dd18c66..e89feba 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -28,17 +28,21 @@ import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.*;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.v1.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
+import java.security.*;
 import java.util.*;
+import java.util.concurrent.*;
 
 import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
@@ -419,7 +423,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
     private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
         Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
 
-        try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf());
+        try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), false);
             FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
 
             in.seek(split.offset());
@@ -448,4 +452,44 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
             throw new IgniteCheckedException(e);
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
+        String user = job.info().user();
+
+        user = IgfsUtils.fixUserName(user);
+
+        assert user != null;
+
+        String ugiUser;
+
+        try {
+            UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+
+            assert currUser != null;
+
+            ugiUser = currUser.getShortUserName();
+        }
+        catch (IOException ioe) {
+            throw new IgniteCheckedException(ioe);
+        }
+
+        try {
+            if (F.eq(user, ugiUser))
+                // if current UGI context user is the same, do direct call:
+                return c.call();
+            else {
+                UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+                return ugi.doAs(new PrivilegedExceptionAction<T>() {
+                    @Override public T run() throws Exception {
+                        return c.call();
+                    }
+                });
+            }
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
index b94d9d1..b9f8179 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.hadoop.mapreduce.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.proto.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -449,7 +448,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
      * @return Configuration.
      */
     private Configuration config(int port) {
-        Configuration conf = new Configuration();
+        Configuration conf = HadoopUtils.safeCreateConfiguration();
 
         setupFileSystems(conf);
 
@@ -521,9 +520,8 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
             ctx.getCounter(TestCounter.COUNTER2).increment(1);
 
             int sum = 0;
-            for (IntWritable value : values) {
+            for (IntWritable value : values)
                 sum += value.get();
-            }
 
             ctx.write(key, new IntWritable(sum));
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
index af1a1e1..e8a0a6f 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -22,7 +22,6 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
 import org.apache.ignite.internal.processors.hadoop.fs.*;
-import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -62,6 +61,17 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
     /** Initial REST port. */
     private int restPort = REST_PORT;
 
+    /** Secondary file system REST endpoint configuration. */
+    protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG;
+
+    static {
+        SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration();
+
+        SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
+        SECONDARY_REST_CFG.setPort(11500);
+    }
+
+
     /** Initial classpath. */
     private static String initCp;
 
@@ -133,7 +143,7 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
     /**
      * @return IGFS configuration.
      */
-    public FileSystemConfiguration igfsConfiguration() {
+    public FileSystemConfiguration igfsConfiguration() throws Exception {
         FileSystemConfiguration cfg = new FileSystemConfiguration();
 
         cfg.setName(igfsName);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
index d10ee5c..c66cdf3 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
@@ -19,12 +19,16 @@ package org.apache.ignite.internal.processors.hadoop;
 
 import com.google.common.base.*;
 import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.hadoop.fs.*;
 import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.processors.resource.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.testframework.junits.common.*;
 import org.jsr166.*;
 
@@ -205,7 +209,15 @@ public class HadoopCommandLineTest extends GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
-        igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName);
+        String cfgPath = "config/hadoop/default-config.xml";
+
+        IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> tup = IgnitionEx.loadConfiguration(cfgPath);
+
+        IgniteConfiguration cfg = tup.get1();
+
+        cfg.setLocalHost("127.0.0.1"); // Avoid connecting to other nodes.
+
+        igfs = (IgfsEx) Ignition.start(cfg).fileSystem(igfsName);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index 8a3a0ac..a1ef7ba 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -24,31 +24,104 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.hadoop.fs.*;
 import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.secondary.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.util.*;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.igfs.IgfsMode.*;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
 
 /**
  * Test of whole cycle of map-reduce processing via Job tracker.
  */
 public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
+    /** IGFS block size. */
+    protected static final int IGFS_BLOCK_SIZE = 512 * 1024;
+
+    /** Amount of blocks to prefetch. */
+    protected static final int PREFETCH_BLOCKS = 1;
+
+    /** Amount of sequential block reads before prefetch is triggered. */
+    protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
+
+    /** Secondary file system URI. */
+    protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/";
+
+    /** Secondary file system configuration path. */
+    protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
+
+    /** The user to run Hadoop job on behalf of. */
+    protected static final String USER = "vasya";
+
+    /** Secondary IGFS name. */
+    protected static final String SECONDARY_IGFS_NAME = "igfs-secondary";
+
+    /** The secondary Ignite node. */
+    protected Ignite igniteSecondary;
+
+    /** The secondary Fs. */
+    protected IgfsSecondaryFileSystem secondaryFs;
+
     /** {@inheritDoc} */
     @Override protected int gridCount() {
         return 3;
     }
 
     /**
+     * Gets owner of a IgfsEx path.
+     * @param p The path.
+     * @return The owner.
+     */
+    private static String getOwner(IgfsEx i, IgfsPath p) {
+        return i.info(p).property(IgfsEx.PROP_USER_NAME);
+    }
+
+    /**
+     * Gets owner of a secondary Fs path.
+     * @param secFs The sec Fs.
+     * @param p The path.
+     * @return The owner.
+     */
+    private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) {
+        return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() {
+            @Override public String apply() {
+                return secFs.info(p).property(IgfsEx.PROP_USER_NAME);
+            }
+        });
+    }
+
+    /**
+     * Checks owner of the path.
+     * @param p The path.
+     */
+    private void checkOwner(IgfsPath p) {
+        String ownerPrim = getOwner(igfs, p);
+        assertEquals(USER, ownerPrim);
+
+        String ownerSec = getOwnerSecondary(secondaryFs, p);
+        assertEquals(USER, ownerSec);
+    }
+
+    /**
      * Tests whole job execution with all phases in all combination of new and old versions of API.
      * @throws Exception If fails.
      */
@@ -71,7 +144,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
             JobConf jobConf = new JobConf();
 
             jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
-            jobConf.setUser("yyy");
+            jobConf.setUser(USER);
             jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz");
 
             //To split into about 40 items for v2
@@ -105,14 +178,20 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
 
             checkJobStatistics(jobId);
 
+            final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";
+
+            checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS"));
+
+            checkOwner(new IgfsPath(outFile));
+
             assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
-                useNewReducer,
+                    useNewReducer,
                 "blue\t200000\n" +
-                "green\t150000\n" +
-                "red\t100000\n" +
-                "yellow\t70000\n",
-                readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000")
-            );
+                    "green\t150000\n" +
+                    "red\t100000\n" +
+                    "yellow\t70000\n",
+                readAndSortFile(outFile)
+                );
         }
     }
 
@@ -182,7 +261,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
             }
         }
 
-        final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance");
+        final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance");
 
         assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
@@ -212,4 +291,85 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
                 ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']';
         }
     }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG);
+
+        super.beforeTest();
+    }
+
+    /**
+     * Start grid with IGFS.
+     *
+     * @param gridName Grid name.
+     * @param igfsName IGFS name
+     * @param mode IGFS mode.
+     * @param secondaryFs Secondary file system (optional).
+     * @param restCfg Rest configuration string (optional).
+     * @return Started grid instance.
+     * @throws Exception If failed.
+     */
+    protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode,
+        @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception {
+        FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+        igfsCfg.setDataCacheName("dataCache");
+        igfsCfg.setMetaCacheName("metaCache");
+        igfsCfg.setName(igfsName);
+        igfsCfg.setBlockSize(IGFS_BLOCK_SIZE);
+        igfsCfg.setDefaultMode(mode);
+        igfsCfg.setIpcEndpointConfiguration(restCfg);
+        igfsCfg.setSecondaryFileSystem(secondaryFs);
+        igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
+        igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
+
+        CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+        dataCacheCfg.setName("dataCache");
+        dataCacheCfg.setCacheMode(PARTITIONED);
+        dataCacheCfg.setNearConfiguration(null);
+        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+        dataCacheCfg.setBackups(0);
+        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+        dataCacheCfg.setOffHeapMaxMemory(0);
+
+        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+        metaCacheCfg.setName("metaCache");
+        metaCacheCfg.setCacheMode(REPLICATED);
+        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setGridName(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        cfg.setDiscoverySpi(discoSpi);
+        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+        cfg.setFileSystemConfiguration(igfsCfg);
+
+        cfg.setLocalHost("127.0.0.1");
+        cfg.setConnectorConfiguration(null);
+
+        return G.start(cfg);
+    }
+
+    /**
+     * @return IGFS configuration.
+     */
+    @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
+        FileSystemConfiguration fsCfg = super.igfsConfiguration();
+
+        secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
+
+        fsCfg.setSecondaryFileSystem(secondaryFs);
+
+        return fsCfg;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
index 8dc9830..eee5c8b 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
@@ -72,7 +72,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
 
 
     /** {@inheritDoc} */
-    @Override public FileSystemConfiguration igfsConfiguration() {
+    @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
         FileSystemConfiguration cfg = super.igfsConfiguration();
 
         cfg.setFragmentizerEnabled(false);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
index aaf0f92..6930020 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.io.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
 
 import java.io.*;
 import java.net.*;
@@ -43,7 +42,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @return Hadoop job.
      * @throws IOException If fails.
      */
-    public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception;
+    public abstract HadoopJob getHadoopJob(String inFile, String outFile) throws Exception;
 
     /**
      * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API
@@ -79,7 +78,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
         HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(),
                 igfs.info(inFile).length() - fileBlock1.length());
 
-        HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
+        HadoopJob gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
 
         HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1);
 
@@ -110,7 +109,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @return Context with mock output.
      * @throws IgniteCheckedException If fails.
      */
-    private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType,
+    private HadoopTestTaskContext runTaskWithInput(HadoopJob gridJob, HadoopTaskType taskType,
         int taskNum, String... words) throws IgniteCheckedException {
         HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null);
 
@@ -136,7 +135,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @throws Exception If fails.
      */
     public void testReduceTask() throws Exception {
-        HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
+        HadoopJob gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
 
         runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10");
         runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15");
@@ -162,7 +161,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @throws Exception If fails.
      */
     public void testCombinerTask() throws Exception {
-        HadoopV2Job gridJob = getHadoopJob("/", "/");
+        HadoopJob gridJob = getHadoopJob("/", "/");
 
         HadoopTestTaskContext ctx =
             runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10");
@@ -182,7 +181,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @return Context of combine task with mock output.
      * @throws IgniteCheckedException If fails.
      */
-    private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob)
+    private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopJob gridJob)
         throws IgniteCheckedException {
         HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock);
 
@@ -228,7 +227,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
         HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l);
         HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l);
 
-        HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
+        HadoopJob gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
 
         HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
index b41a260..48e83cc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop;
 
 import org.apache.hadoop.mapred.*;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
 
 import java.io.*;
 import java.util.*;
@@ -38,7 +37,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
      * @return Hadoop job.
      * @throws IOException If fails.
      */
-    @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+    @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
         JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile);
 
         setupFileSystems(jobConf);
@@ -47,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
 
         HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
 
-        return new HadoopV2Job(jobId, jobInfo, log);
+        return jobInfo.createJob(jobId, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
index b677c63..e73fae3 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.input.*;
 import org.apache.hadoop.mapreduce.lib.output.*;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
 
 import java.util.*;
 
@@ -42,7 +41,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
      * @return Hadoop job.
      * @throws Exception if fails.
      */
-    @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+    @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
         Job job = Job.getInstance();
 
         job.setOutputKeyClass(Text.class);
@@ -65,7 +64,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
 
         HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
 
-        return new HadoopV2Job(jobId, jobInfo, log);
+        return jobInfo.createJob(jobId, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
index ebc89f4..f3b9307 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
@@ -66,7 +66,11 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
         cfg.setMapOutputValueClass(Text.class);
         cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
 
-        HadoopJob job = new HadoopV2Job(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log);
+        HadoopDefaultJobInfo info = createJobInfo(cfg);
+
+        HadoopJobId id = new HadoopJobId(UUID.randomUUID(), 1);
+
+        HadoopJob job = info.createJob(id, log);
 
         HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0,
             null));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
index b4ed5e1..9395c5e 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.testframework.junits.common.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Abstract class for maps test.
@@ -95,9 +96,20 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
             assert false;
         }
 
+        /** {@inheritDoc} */
         @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
             assert false;
         }
+
+        /** {@inheritDoc} */
+        @Override public <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException {
+            try {
+                return c.call();
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(e);
+            }
+        }
     }
 
     /**


Mime
View raw message