ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [4/5] incubator-ignite git commit: # IG-980: implemenmted 1 job class loader per node variant.
Date Fri, 19 Jun 2015 18:31:13 GMT
# IG-980: implemenmted 1 job class loader per node variant.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5285b72c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5285b72c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5285b72c

Branch: refs/heads/ignite-980
Commit: 5285b72c3341eebb555a3562d0395459b654fe94
Parents: acc2cb9
Author: iveselovskiy <iveselovskiy@gridgain.com>
Authored: Fri Jun 19 21:30:23 2015 +0300
Committer: iveselovskiy <iveselovskiy@gridgain.com>
Committed: Fri Jun 19 21:30:23 2015 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopJobInfo.java        |  3 +-
 .../fs/IgniteHadoopFileSystemCounterWriter.java |  3 +-
 .../processors/hadoop/HadoopDefaultJobInfo.java | 57 +++++++++++----
 .../internal/processors/hadoop/HadoopUtils.java | 77 ++++++++------------
 .../hadoop/jobtracker/HadoopJobTracker.java     |  5 +-
 .../child/HadoopChildProcessRunner.java         |  5 +-
 .../processors/hadoop/v2/HadoopV2Job.java       | 65 ++++++++++++-----
 .../hadoop/v2/HadoopV2JobResourceManager.java   |  7 +-
 .../hadoop/v2/HadoopV2TaskContext.java          |  2 +-
 .../processors/hadoop/HadoopTasksV1Test.java    |  6 +-
 .../processors/hadoop/HadoopTasksV2Test.java    |  6 +-
 .../processors/hadoop/HadoopV2JobSelfTest.java  |  6 +-
 .../collections/HadoopAbstractMapTest.java      |  2 +-
 13 files changed, 144 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
index 51faf5d..e676cbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
+import java.util.*;
 
 /**
  * Compact job description.
@@ -60,7 +61,7 @@ public interface HadoopJobInfo extends Serializable {
      * @return Job.
      * @throws IgniteCheckedException If failed.
      */
-    HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException;
+    public HadoopJob createJob(UUID localNodeId, HadoopJobId jobId, IgniteLogger log) throws
IgniteCheckedException;
 
     /**
      * @return Number of reducers configured for job.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 420cece..597ff8a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -72,8 +72,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
         try {
             hadoopCfg.set(MRJobConfig.USER_NAME, user);
 
-            FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg,
-                jobId.toString(), null);
+            FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg,
jobId);
 
             fs.mkdirs(jobStatPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index 2e855d0..9e685ea 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -18,13 +18,17 @@
 package org.apache.ignite.internal.processors.hadoop;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
+import org.jsr166.*;
 
 import java.io.*;
 import java.lang.reflect.*;
 import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Hadoop job info based on default Hadoop configuration.
@@ -49,7 +53,38 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable
{
     private String user;
 
     /** */
-    private static volatile Class<?> jobCls;
+    private static class CloseableClass implements Closeable {
+        private final Class<?> clazz;
+
+        CloseableClass(Class<?> c) {
+            clazz = c;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            // Noop
+        }
+
+        public Class<?> clazz() {
+            return clazz;
+        }
+    }
+
+    /** */
+    private static final HadoopLazyConcurrentMap<UUID, CloseableClass> hadoopV2JobClasses
=
+        new HadoopLazyConcurrentMap<>(new HadoopLazyConcurrentMap.ValueFactory<UUID,
CloseableClass>() {
+            @Override public CloseableClass createValue(UUID key) {
+                HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job-node-" +
key);
+
+                try {
+                    Class<?> jobCls = ldr.loadClass(HadoopV2Job.class.getName());
+
+                    return new CloseableClass(jobCls);
+                } catch (Exception ioe) {
+                    throw new IgniteException(ioe);
+                }
+            }
+        });
 
     /**
      * Default constructor required by {@link Externalizable}.
@@ -82,24 +117,16 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable
{
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException
{
+    @Override public HadoopJob createJob(UUID nodeId, HadoopJobId jobId, IgniteLogger log)
throws IgniteCheckedException {
         try {
-            Class<?> jobCls0 = jobCls;
-
-            if (jobCls0 == null) { // It is enough to have only one class loader with only
Hadoop classes.
-                synchronized (HadoopDefaultJobInfo.class) {
-                    if ((jobCls0 = jobCls) == null) {
-                        HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job");
+            Class<?> jobCls0 = hadoopV2JobClasses.getOrCreate(nodeId).clazz();
 
-                        jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName());
-                    }
-                }
-            }
+            X.println("#### Creating job: HadoopJob: " + nodeId + ", class = " + jobCls0);
 
-            Constructor<?> constructor = jobCls0.getConstructor(HadoopJobId.class,
HadoopDefaultJobInfo.class,
-                IgniteLogger.class);
+            Constructor<?> constructor = jobCls0.getConstructor(HadoopJobId.class,
UUID.class,
+                HadoopDefaultJobInfo.class, IgniteLogger.class);
 
-            return (HadoopJob)constructor.newInstance(jobId, this, log);
+            return (HadoopJob)constructor.newInstance(jobId, nodeId, this, log);
         }
         // NB: java.lang.NoClassDefFoundError may be thrown from Class#getConstructor() call.
         catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index f50f0b3..2234549 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -33,10 +33,12 @@ import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
+import org.jsr166.*;
 
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Hadoop utility methods.
@@ -68,8 +70,8 @@ public class HadoopUtils {
         = createHadoopLazyConcurrentMap();
 
     /** File system cache for jobs. */
-    private static final Map<String, T2<HadoopLazyConcurrentMap<FsCacheKey,FileSystem>,
Set<String>>> jobFsMap
-        = new HashMap<>(16);
+    private static final ConcurrentMap<HadoopJobId, HadoopLazyConcurrentMap<FsCacheKey,FileSystem>>
jobFsMap
+        = new ConcurrentHashMap8<>();
 
     /**
      * Creates HadoopLazyConcurrentMap.
@@ -414,8 +416,8 @@ public class HadoopUtils {
      * @return the file system
      * @throws IOException
      */
-    public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg,
-        @Nullable String jobId, @Nullable String locId) throws IOException {
+    public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, @Nullable
HadoopJobId jobId)
+            throws IOException {
         final String usr = getMrHadoopUser(cfg);
 
         assert usr != null;
@@ -426,7 +428,7 @@ public class HadoopUtils {
         final FileSystem fs;
 
         try {
-            fs = getWithCaching(uri, cfg, usr, jobId, locId);
+            fs = getWithCaching(uri, cfg, usr, jobId);
         }
         catch (IgniteException ie) {
             throw new IOException(ie);
@@ -545,14 +547,13 @@ public class HadoopUtils {
      * @param usr The user to create file system for.
      * @return The file system: either created, or taken from the cache.
      */
-    private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr,
-            @Nullable String jobId, @Nullable String locId) {
+    private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr, @Nullable
HadoopJobId jobId) {
         final FsCacheKey key = new FsCacheKey(uri, usr, cfg);
 
         if (jobId == null)
             return fileSysLazyMap.getOrCreate(key);
         else {
-            HadoopLazyConcurrentMap<FsCacheKey,FileSystem> lm = getFsMapForJob(jobId,
locId);
+            HadoopLazyConcurrentMap<FsCacheKey,FileSystem> lm = getFsMapForJob(jobId);
 
             return lm.getOrCreate(key);
         }
@@ -562,67 +563,47 @@ public class HadoopUtils {
      * Gets Fs map for given job Id and localNodeId. If local node Id not null, registers
this
      * local node id to track subsequent removal.
      * @param jobId The job Id.
-     * @param locId The local node id.
      * @return File system map.
      */
-    private static synchronized HadoopLazyConcurrentMap<FsCacheKey,FileSystem> getFsMapForJob(String
jobId,
-        @Nullable String locId) {
+    private static HadoopLazyConcurrentMap<FsCacheKey,FileSystem> getFsMapForJob(final
HadoopJobId jobId) {
         assert jobId != null;
 
-        T2<HadoopLazyConcurrentMap<FsCacheKey, FileSystem>, Set<String>>
t2 = jobFsMap.get(jobId);
+        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> lazy = jobFsMap.get(jobId);
 
-        if (t2 == null) {
-            HadoopLazyConcurrentMap<FsCacheKey, FileSystem> newLM = createHadoopLazyConcurrentMap();
+        if (lazy != null)
+            return lazy;
 
-            t2 = new T2<>(newLM, (Set<String>)new HashSet<String>());
+        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> newLM = createHadoopLazyConcurrentMap();
 
-            T2<HadoopLazyConcurrentMap<FsCacheKey, FileSystem>, Set<String>>
pushedT2 = jobFsMap.put(jobId, t2);
+        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> pushedT2 = jobFsMap.putIfAbsent(jobId,
newLM);
 
-            assert pushedT2 == null;
-        }
-
-        if (locId != null) {
-            // If local  node Id is given, register this local Id for this job:
-            boolean added = t2.get2().add(locId);
+        if (pushedT2 == null)
+            lazy = newLM;
+        else {
+            lazy = pushedT2;
 
-            // new locId appears in HadoopV2Job#initialize(), no job with the same locId
should be registered:
-            assert added;
+            try {
+                newLM.close();
+            } catch (IgniteCheckedException ice) {
+                throw new IgniteException(ice);
+            }
         }
 
-        return t2.get1();
+        return lazy;
     }
 
     /**
      * Closes file system map for this job Id and local node id.
      * @param jobId The job id.
-     * @param locId The local node id.
      * @throws IgniteCheckedException
      */
-    public static synchronized void close(final String jobId, final String locId) throws
IgniteCheckedException {
+    public static synchronized void close(final HadoopJobId jobId) throws IgniteCheckedException
{
         assert jobId != null;
-        assert locId != null;
-
-        final T2<HadoopLazyConcurrentMap<FsCacheKey, FileSystem>, Set<String>>
t2 = jobFsMap.get(jobId);
-
-        if (t2 != null) {
-            Set<String> locIds = t2.get2();
-
-            boolean rm = locIds.remove(locId);
 
-            assert rm;
+        final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> lazy = jobFsMap.remove(jobId);
 
-            final int usageCnt = locIds.size();
-
-            assert usageCnt >= 0 : "negative usage count " + usageCnt + ", map: " + t2.get1();
-
-            if (usageCnt == 0) {
-                T2<HadoopLazyConcurrentMap<FsCacheKey, FileSystem>, Set<String>>
rmT2 = jobFsMap.remove(jobId);
-
-                assert rmT2 == t2;
-
-                t2.get1().close();
-            }
-        }
+        if (lazy != null)
+            lazy.close();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index 2f07817..b7377d4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -224,6 +224,7 @@ public class HadoopJobTracker extends HadoopComponent {
         // Fail all pending futures.
         for (GridFutureAdapter<HadoopJobId> fut : activeFinishFuts.values())
             fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job
(grid is stopping)."));
+
     }
 
     /**
@@ -986,7 +987,9 @@ public class HadoopJobTracker extends HadoopComponent {
                 jobInfo = meta.jobInfo();
             }
 
-            job = jobInfo.createJob(jobId, log);
+            UUID nodeId = ctx.localNodeId();
+
+            job = jobInfo.createJob(nodeId, jobId, log);
 
             job.initialize(false, ctx.localNodeId());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
index 040552a..cfa393e 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.util.offheap.unsafe.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
+import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
@@ -115,7 +116,9 @@ public class HadoopChildProcessRunner {
 
                 assert job == null;
 
-                job = req.jobInfo().createJob(req.jobId(), log);
+                UUID nodeId = nodeDesc.parentNodeId();
+
+                job = req.jobInfo().createJob(nodeId, req.jobId(), log);
 
                 job.initialize(true, nodeDesc.processId());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index f48db98..6e957df 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -74,7 +74,7 @@ public class HadoopV2Job implements HadoopJob {
     private final Queue<Class<?>> fullCtxClsQueue = new ConcurrentLinkedDeque<>();
 
     /** Local node ID */
-    private volatile UUID locNodeId;
+    private final UUID locNodeId;
 
     /** Serialized JobConf. */
     private volatile byte[] jobConfData;
@@ -87,12 +87,18 @@ public class HadoopV2Job implements HadoopJob {
      * @param jobInfo Job info.
      * @param log Logger.
      */
-    public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger
log) {
+    public HadoopV2Job(HadoopJobId jobId, UUID locNodeId, final HadoopDefaultJobInfo jobInfo,
IgniteLogger log) {
         assert jobId != null;
+        assert locNodeId != null;
         assert jobInfo != null;
 
         this.jobId = jobId;
         this.jobInfo = jobInfo;
+        this.locNodeId = locNodeId;
+
+        // TODO: debug:
+        assert getClass().getClassLoader() instanceof HadoopClassLoader;
+        assert getClass().getClassLoader().toString().contains(locNodeId.toString());
 
         hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
 
@@ -138,7 +144,7 @@ public class HadoopV2Job implements HadoopJob {
             Path jobDir = new Path(jobDirPath);
 
             try {
-                FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, jobId.toString(),
null);
+                FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, jobId);
 
                 JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID,
fs, jobConf,
                     jobDir);
@@ -248,17 +254,15 @@ public class HadoopV2Job implements HadoopJob {
     /** {@inheritDoc} */
     @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException
{
         assert locNodeId != null;
-        assert this.locNodeId == null;
-
-        this.locNodeId = locNodeId;
+        assert this.locNodeId.equals(locNodeId);
 
         Thread.currentThread().setContextClassLoader(jobConf.getClassLoader());
 
-        try {
-            HadoopUtils.fileSystemForMrUser(null, jobConf, jobId.toString(), this.locNodeId.toString());
-        } catch (IOException ioe) {
-            throw new IgniteCheckedException(ioe);
-        }
+//        try {
+//            HadoopUtils.fileSystemForMrUser(null, jobConf);
+//        } catch (IOException ioe) {
+//            throw new IgniteCheckedException(ioe);
+//        }
 
         try {
             rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, jobId));
@@ -273,6 +277,8 @@ public class HadoopV2Job implements HadoopJob {
     @Override public void dispose(boolean external) throws IgniteCheckedException {
         boolean dsp = disposed.compareAndSet(false, true);
 
+        X.println("###### Dispose: " + locNodeId + ", ldr = " + getClass().getClassLoader());
+
         if (!dsp)
             return;
 
@@ -300,11 +306,8 @@ public class HadoopV2Job implements HadoopJob {
                 try {
                     final ClassLoader ldr = cls.getClassLoader();
 
-                    Class<?> daemonCls = ldr.loadClass(HadoopClassLoader.HADOOP_DAEMON_CLASS_NAME);
-
-                    Method m = daemonCls.getMethod("dequeueAndStopAll");
-
-                    m.invoke(null);
+                    // Stop Hadoop daemons for this *task*:
+                    stopHadoopFsDaemons(ldr);
 
                     // Also close all the FileSystems cached in
                     // HadoopLazyConcurrentMap for this *task* class loader:
@@ -322,25 +325,47 @@ public class HadoopV2Job implements HadoopJob {
             assert fullCtxClsQueue.isEmpty();
 
             // Close all cached Fs for this Job:
-            HadoopUtils.close(jobId.toString(), locNodeId.toString());
+            //HadoopUtils.close(jobId.toString(), locNodeId.toString());
+            //closeCachedFileSystems(getClass().getClassLoader());
+            HadoopUtils.close(jobId);
 
-            int i = 0;
-            while (i++ < 5)
-                System.gc();
+            invokeGc();
 
             if (err != null)
                 throw U.cast(err);
         }
     }
 
+    // TODO: remove
+    private void invokeGc() {
+        int i = 0;
+
+        while (i++ < 5)
+            System.gc();
+    }
+
     /** {@inheritDoc} */
     @Override protected void finalize() throws Throwable {
         super.finalize();
 
+        // TODO: remove
         dispose(false);
     }
 
     /**
+     * Stops Hadoop Fs daemon threads.
+     * @param ldr The task ClassLoader to stop the daemons for.
+     * @throws Exception On error.
+     */
+    private void stopHadoopFsDaemons(ClassLoader ldr) throws Exception {
+        Class<?> daemonCls = ldr.loadClass(HadoopClassLoader.HADOOP_DAEMON_CLASS_NAME);
+
+        Method m = daemonCls.getMethod("dequeueAndStopAll");
+
+        m.invoke(null);
+    }
+
+    /**
      * Closes all the file systems user by task
      * @param ldr The task class loader.
      * @throws Exception On error.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 7d13fe9..b86f16d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -115,7 +115,7 @@ public class HadoopV2JobResourceManager {
                 stagingDir = new Path(new URI(mrDir));
 
                 if (download) {
-                    FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg,
jobId.toString(), null);
+                    FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg,
jobId);
 
                     if (!fs.exists(stagingDir))
                         throw new IgniteCheckedException("Failed to find map-reduce submission
" +
@@ -210,7 +210,7 @@ public class HadoopV2JobResourceManager {
 
             FileSystem dstFs = FileSystem.getLocal(cfg);
 
-            FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, jobId.toString(),
null);
+            FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, jobId);
 
             if (extract) {
                 File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
@@ -293,8 +293,7 @@ public class HadoopV2JobResourceManager {
     public void cleanupStagingDirectory() {
         try {
             if (stagingDir != null) {
-                FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(),
-                    jobId.toString(), null);
+                FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(),
jobId);
 
                 fs.delete(stagingDir, true);
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 3452678..7012566 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -428,7 +428,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
         try {
             // Task class loader.
             // We also cache Fs there, all them will be cleared explicitly upon the Job end.
-            fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), null, null);
+            fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), null);
         }
         catch (IOException e) {
             throw new IgniteCheckedException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
index 48e83cc..93707fe 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
@@ -44,9 +44,11 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
 
         HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf);
 
-        HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
+        UUID uuid = new UUID(0, 0);
 
-        return jobInfo.createJob(jobId, log);
+        HadoopJobId jobId = new HadoopJobId(uuid, 0);
+
+        return jobInfo.createJob(uuid, jobId, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
index e73fae3..bd4c343 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -62,9 +62,11 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
 
         HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration());
 
-        HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
+        UUID uuid = new UUID(0, 0);
 
-        return jobInfo.createJob(jobId, log);
+        HadoopJobId jobId = new HadoopJobId(uuid, 0);
+
+        return jobInfo.createJob(uuid, jobId, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
index f3b9307..a9fd2c9 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
@@ -68,9 +68,11 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
 
         HadoopDefaultJobInfo info = createJobInfo(cfg);
 
-        HadoopJobId id = new HadoopJobId(UUID.randomUUID(), 1);
+        final UUID uuid = UUID.randomUUID();
 
-        HadoopJob job = info.createJob(id, log);
+        HadoopJobId id = new HadoopJobId(uuid, 1);
+
+        HadoopJob job = info.createJob(uuid, id, log);
 
         HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP,
null, 0, 0,
             null));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5285b72c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
index 9395c5e..12b6611 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
@@ -136,7 +136,7 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest
{
         }
 
         /** {@inheritDoc} */
-        @Override public HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws
IgniteCheckedException {
+        @Override public HadoopJob createJob(UUID localNodeId, HadoopJobId jobId, IgniteLogger
log) throws IgniteCheckedException {
             assert false;
 
             return null;


Mime
View raw message