Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6981418175 for ; Fri, 26 Jun 2015 00:07:56 +0000 (UTC) Received: (qmail 5092 invoked by uid 500); 26 Jun 2015 00:07:56 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 5058 invoked by uid 500); 26 Jun 2015 00:07:56 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 5049 invoked by uid 99); 26 Jun 2015 00:07:56 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Jun 2015 00:07:56 +0000 X-ASF-Spam-Status: No, hits=-2001.4 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 26 Jun 2015 00:05:36 +0000 Received: (qmail 1072 invoked by uid 99); 26 Jun 2015 00:07:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Jun 2015 00:07:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BCB7BE36C9; Fri, 26 Jun 2015 00:07:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 26 Jun 2015 00:07:43 -0000 Message-Id: In-Reply-To: <6a338478984d4a90b375beb9f95ea837@git.apache.org> References: <6a338478984d4a90b375beb9f95ea837@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [22/50] incubator-ignite git commit: [IGNITE-980]: Investigate if we should close() the FileSystems in MapRed task implementations. X-Virus-Checked: Checked by ClamAV on apache.org [IGNITE-980]: Investigate if we should close() the FileSystems in MapRed task implementations. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3b49184b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3b49184b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3b49184b Branch: refs/heads/ignite-621 Commit: 3b49184b48ca55d5cf4dcdd7485ab8aba1bb9cff Parents: 36433bc Author: iveselovskiy Authored: Tue Jun 23 13:58:06 2015 +0300 Committer: iveselovskiy Committed: Tue Jun 23 13:58:06 2015 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopJobInfo.java | 4 +- .../hadoop/counter/HadoopCounterWriter.java | 5 +- modules/hadoop/pom.xml | 78 ------ .../fs/IgniteHadoopFileSystemCounterWriter.java | 9 +- .../processors/hadoop/HadoopClassLoader.java | 29 +++ .../processors/hadoop/HadoopDefaultJobInfo.java | 27 +-- .../internal/processors/hadoop/HadoopUtils.java | 237 ------------------- .../hadoop/SecondaryFileSystemProvider.java | 3 +- .../hadoop/fs/HadoopFileSystemsUtils.java | 11 + .../hadoop/fs/HadoopLazyConcurrentMap.java | 5 + .../hadoop/jobtracker/HadoopJobTracker.java | 25 +- .../child/HadoopChildProcessRunner.java | 3 +- .../processors/hadoop/v2/HadoopV2Job.java | 84 ++++++- .../hadoop/v2/HadoopV2JobResourceManager.java | 22 +- .../hadoop/v2/HadoopV2TaskContext.java | 37 ++- .../apache/ignite/igfs/IgfsEventsTestSuite.java | 5 +- .../processors/hadoop/HadoopMapReduceTest.java | 2 +- .../processors/hadoop/HadoopTasksV1Test.java | 7 +- .../processors/hadoop/HadoopTasksV2Test.java | 7 +- .../processors/hadoop/HadoopV2JobSelfTest.java | 6 +- .../collections/HadoopAbstractMapTest.java | 3 +- .../testsuites/IgniteHadoopTestSuite.java | 2 +- .../IgniteIgfsLinuxAndMacOSTestSuite.java | 3 +- 23 files changed, 237 insertions(+), 377 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java index 51faf5d..d3735d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java @@ -55,12 +55,14 @@ public interface HadoopJobInfo extends Serializable { * This method will be called once for the same ID on one node, though it can be called on the same host * multiple times from different processes (in case of multiple nodes on the same host or external execution). * + * @param jobCls The job class. * @param jobId Job ID. * @param log Logger. * @return Job. * @throws IgniteCheckedException If failed. */ - HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException; + public HadoopJob createJob(Class jobCls, + HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException; /** * @return Number of reducers configured for job. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java index ce67c57..f21a1e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java @@ -28,10 +28,9 @@ public interface HadoopCounterWriter { /** * Writes counters of given job to some statistics storage. * - * @param jobInfo Job info. - * @param jobId Job id. + * @param job The job. * @param cntrs Counters. * @throws IgniteCheckedException If failed. */ - public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs) throws IgniteCheckedException; + public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/pom.xml ---------------------------------------------------------------------- diff --git a/modules/hadoop/pom.xml b/modules/hadoop/pom.xml index 637f097..e300ba0 100644 --- a/modules/hadoop/pom.xml +++ b/modules/hadoop/pom.xml @@ -143,84 +143,6 @@ maven-surefire-plugin 2.17 - - org.apache.hadoop:hadoop-annotations - org.apache.hadoop:hadoop-auth - commons-codec:commons-codec - org.apache.httpcomponents:httpclient - org.apache.httpcomponents:httpcore - org.apache.hadoop:hadoop-common - com.google.guava:guava - commons-cli:commons-cli - org.apache.commons:commons-math3 - xmlenc:xmlenc - commons-httpclient:commons-httpclient - commons-net:commons-net - javax.servlet:servlet-api - org.mortbay.jetty:jetty - org.mortbay.jetty:jetty-util - com.sun.jersey:jersey-core - com.sun.jersey:jersey-json - org.codehaus.jettison:jettison - com.sun.xml.bind:jaxb-impl - org.codehaus.jackson:jackson-jaxrs - org.codehaus.jackson:jackson-xc - com.sun.jersey:jersey-server - asm:asm - tomcat:jasper-compiler - tomcat:jasper-runtime - javax.servlet.jsp:jsp-api - commons-el:commons-el - - net.java.dev.jets3t:jets3t - com.jamesmurty.utils:java-xmlbuilder - org.codehaus.jackson:jackson-core-asl - org.codehaus.jackson:jackson-mapper-asl - - org.apache.avro:avro - com.thoughtworks.paranamer:paranamer - org.xerial.snappy:snappy-java - com.google.protobuf:protobuf-java - com.jcraft:jsch - com.google.code.findbugs:jsr305 - org.apache.zookeeper:zookeeper - - org.tukaani:xz - org.apache.hadoop:hadoop-hdfs - commons-daemon:commons-daemon - org.apache.hadoop:hadoop-mapreduce-client-common - - org.apache.hadoop:hadoop-yarn-common - org.apache.hadoop:hadoop-yarn-api - javax.xml.bind:jaxb-api - javax.xml.stream:stax-api - javax.activation:activation - com.google.inject:guice - javax.inject:javax.inject - com.sun.jersey.contribs:jersey-guice - org.apache.hadoop:hadoop-yarn-client - com.sun.jersey:jersey-client - org.apache.hadoop:hadoop-yarn-server-common - - com.google.inject.extensions:guice-servlet - - io.netty:netty - org.apache.hadoop:hadoop-mapreduce-client-core - - - org.hamcrest:hamcrest-core - - org.eclipse.jetty:jetty-http - commons-io:commons-io - - commons-configuration:commons-configuration - - org.eclipse.jetty:jetty-server - org.eclipse.jetty:jetty-util - org.eclipse.jetty:jetty-io - aopalliance:aopalliance - com.beust:jcommander - http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index d910507..f76b354 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -25,6 +25,7 @@ import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; @@ -48,11 +49,15 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO; /** {@inheritDoc} */ - @Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs) + @Override public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException { Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration(); + final HadoopJobInfo jobInfo = job.info(); + + final HadoopJobId jobId = job.id(); + for (Map.Entry e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) hadoopCfg.set(e.getKey(), e.getValue()); @@ -72,7 +77,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter try { hadoopCfg.set(MRJobConfig.USER_NAME, user); - FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, true); + FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), hadoopCfg); fs.mkdirs(jobStatPath); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java index eb98ff9..0988fe0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java @@ -67,6 +67,28 @@ public class HadoopClassLoader extends URLClassLoader { private final String name; /** + * Gets name for Job class loader. The name is specific for local node id. + * @param locNodeId The local node id. + * @return The class loader name. + */ + public static String nameForJob(UUID locNodeId) { + return "hadoop-job-node-" + locNodeId.toString(); + } + + /** + * Gets name for the task class loader. Task class loader + * @param info The task info. + * @param prefix Get only prefix (without task type and number) + * @return The class loader name. + */ + public static String nameForTask(HadoopTaskInfo info, boolean prefix) { + if (prefix) + return "hadoop-task-" + info.jobId() + "-"; + else + return "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber(); + } + + /** * @param urls Urls. */ public HadoopClassLoader(URL[] urls, String name) { @@ -568,4 +590,11 @@ public class HadoopClassLoader extends URLClassLoader { @Override public String toString() { return S.toString(HadoopClassLoader.class, this); } + + /** + * Getter for name field. + */ + public String name() { + return name; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index 2e855d0..95e03c8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -48,9 +47,6 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { /** User name. */ private String user; - /** */ - private static volatile Class jobCls; - /** * Default constructor required by {@link Externalizable}. */ @@ -82,24 +78,15 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { } /** {@inheritDoc} */ - @Override public HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { - try { - Class jobCls0 = jobCls; + @Override public HadoopJob createJob(Class jobCls, + HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { + assert jobCls != null; - if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes. - synchronized (HadoopDefaultJobInfo.class) { - if ((jobCls0 = jobCls) == null) { - HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job"); - - jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName()); - } - } - } - - Constructor constructor = jobCls0.getConstructor(HadoopJobId.class, HadoopDefaultJobInfo.class, - IgniteLogger.class); + try { + Constructor constructor = jobCls.getConstructor(HadoopJobId.class, + HadoopDefaultJobInfo.class, IgniteLogger.class); - return (HadoopJob)constructor.newInstance(jobId, this, log); + return constructor.newInstance(jobId, this, log); } // NB: java.lang.NoClassDefFoundError may be thrown from Class#getConstructor() call. catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 68a9ef6..f87e610 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -26,16 +26,10 @@ import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.*; import org.apache.ignite.*; -import org.apache.ignite.hadoop.fs.v1.*; -import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; import java.io.*; -import java.net.*; import java.util.*; /** @@ -63,34 +57,6 @@ public class HadoopUtils { /** Old reducer class attribute. */ private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class"; - /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ - private static final HadoopLazyConcurrentMap fileSysLazyMap = new HadoopLazyConcurrentMap<>( - new HadoopLazyConcurrentMap.ValueFactory() { - @Override public FileSystem createValue(FsCacheKey key) { - try { - assert key != null; - - // Explicitly disable FileSystem caching: - URI uri = key.uri(); - - String scheme = uri.getScheme(); - - // Copy the configuration to avoid altering the external object. - Configuration cfg = new Configuration(key.configuration()); - - String prop = HadoopUtils.disableFsCachePropertyName(scheme); - - cfg.setBoolean(prop, true); - - return FileSystem.get(uri, cfg, key.user()); - } - catch (IOException | InterruptedException ioe) { - throw new IgniteException(ioe); - } - } - } - ); - /** * Constructor. */ @@ -378,207 +344,4 @@ public class HadoopUtils { } } - /** - * Gets non-null user name as per the Hadoop viewpoint. - * @param cfg the Hadoop job configuration, may be null. - * @return the user name, never null. - */ - private static String getMrHadoopUser(Configuration cfg) throws IOException { - String user = cfg.get(MRJobConfig.USER_NAME); - - if (user == null) - user = IgniteHadoopFileSystem.getFsHadoopUser(); - - return user; - } - - /** - * Common method to get the V1 file system in MapRed engine. - * It creates the filesystem for the user specified in the - * configuration with {@link MRJobConfig#USER_NAME} property. - * @param uri the file system uri. - * @param cfg the configuration. - * @return the file system - * @throws IOException - */ - public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, boolean doCacheFs) throws IOException { - final String usr = getMrHadoopUser(cfg); - - assert usr != null; - - if (uri == null) - uri = FileSystem.getDefaultUri(cfg); - - final FileSystem fs; - - if (doCacheFs) { - try { - fs = getWithCaching(uri, cfg, usr); - } - catch (IgniteException ie) { - throw new IOException(ie); - } - } - else { - try { - fs = FileSystem.get(uri, cfg, usr); - } - catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - - throw new IOException(ie); - } - } - - assert fs != null; - assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user()); - - return fs; - } - - /** - * Note that configuration is not a part of the key. - * It is used solely to initialize the first instance - * that is created for the key. - */ - public static final class FsCacheKey { - /** */ - private final URI uri; - - /** */ - private final String usr; - - /** */ - private final String equalityKey; - - /** */ - private final Configuration cfg; - - /** - * Constructor - */ - public FsCacheKey(URI uri, String usr, Configuration cfg) { - assert uri != null; - assert usr != null; - assert cfg != null; - - this.uri = fixUri(uri, cfg); - this.usr = usr; - this.cfg = cfg; - - this.equalityKey = createEqualityKey(); - } - - /** - * Creates String key used for equality and hashing. - */ - private String createEqualityKey() { - GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@"); - - if (uri.getScheme() != null) - sb.a(uri.getScheme().toLowerCase()); - - sb.a("://"); - - if (uri.getAuthority() != null) - sb.a(uri.getAuthority().toLowerCase()); - - return sb.toString(); - } - - /** - * The URI. - */ - public URI uri() { - return uri; - } - - /** - * The User. - */ - public String user() { - return usr; - } - - /** - * The Configuration. - */ - public Configuration configuration() { - return cfg; - } - - /** {@inheritDoc} */ - @SuppressWarnings("SimplifiableIfStatement") - @Override public boolean equals(Object obj) { - if (obj == this) - return true; - - if (obj == null || getClass() != obj.getClass()) - return false; - - return equalityKey.equals(((FsCacheKey)obj).equalityKey); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return equalityKey.hashCode(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return equalityKey; - } - } - - /** - * Gets FileSystem caching it in static Ignite cache. The cache is a singleton - * for each class loader. - * - *

Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}. - * The Configuration is not a part of the key. This means that for the given key file system is - * initialized only once with the Configuration passed in upon the file system creation. - * - * @param uri The file system URI. - * @param cfg The configuration. - * @param usr The user to create file system for. - * @return The file system: either created, or taken from the cache. - */ - private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) { - FsCacheKey key = new FsCacheKey(uri, usr, cfg); - - return fileSysLazyMap.getOrCreate(key); - } - - /** - * Gets the property name to disable file system cache. - * @param scheme The file system URI scheme. - * @return The property name. If scheme is null, - * returns "fs.null.impl.disable.cache". - */ - public static String disableFsCachePropertyName(@Nullable String scheme) { - return String.format("fs.%s.impl.disable.cache", scheme); - } - - /** - * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). - * @param uri0 The uri. - * @param cfg The cfg. - * @return Correct URI. - */ - public static URI fixUri(URI uri0, Configuration cfg) { - if (uri0 == null) - return FileSystem.getDefaultUri(cfg); - - String scheme = uri0.getScheme(); - String authority = uri0.getAuthority(); - - if (authority == null) { - URI dfltUri = FileSystem.getDefaultUri(cfg); - - if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null)) - return dfltUri; - } - - return uri0; - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java index dd679de..ef04b0f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.security.*; +import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -76,7 +77,7 @@ public class SecondaryFileSystemProvider { } // Disable caching: - String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme()); + String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme()); cfg.setBoolean(prop, true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java index d90bc28..382bbd0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop.fs; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; +import org.jetbrains.annotations.*; /** * Utilities for configuring file systems to support the separate working directory per each thread. @@ -37,4 +38,14 @@ public class HadoopFileSystemsUtils { cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV2.class.getName()); } + + /** + * Gets the property name to disable file system cache. + * @param scheme The file system URI scheme. + * @return The property name. If scheme is null, + * returns "fs.null.impl.disable.cache". + */ + public static String disableFsCachePropertyName(@Nullable String scheme) { + return String.format("fs.%s.impl.disable.cache", scheme); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java index 71b38c4..c7565d3 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java @@ -51,6 +51,8 @@ public class HadoopLazyConcurrentMap { */ public HadoopLazyConcurrentMap(ValueFactory factory) { this.factory = factory; + + assert getClass().getClassLoader() == Ignite.class.getClassLoader(); } /** @@ -105,6 +107,9 @@ public class HadoopLazyConcurrentMap { closeLock.writeLock().lock(); try { + if (closed) + return; + closed = true; Exception err = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index 2f07817..194ae33 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; @@ -82,6 +83,9 @@ public class HadoopJobTracker extends HadoopComponent { /** Component busy lock. */ private GridSpinReadWriteLock busyLock; + /** Class to create HadoopJob instances from. */ + private Class jobCls; + /** Closure to check result of async transform of system cache. */ private final IgniteInClosure> failsLog = new CI1>() { @Override public void apply(IgniteInternalFuture gridFut) { @@ -95,12 +99,27 @@ public class HadoopJobTracker extends HadoopComponent { }; /** {@inheritDoc} */ - @Override public void start(HadoopContext ctx) throws IgniteCheckedException { + @SuppressWarnings("unchecked") + @Override public void start(final HadoopContext ctx) throws IgniteCheckedException { super.start(ctx); busyLock = new GridSpinReadWriteLock(); evtProcSvc = Executors.newFixedThreadPool(1); + + UUID nodeId = ctx.localNodeId(); + + assert jobCls == null; + + HadoopClassLoader ldr = new HadoopClassLoader(null, HadoopClassLoader.nameForJob(nodeId)); + + try { + jobCls = (Class)ldr.loadClass(HadoopV2Job.class.getName()); + } + catch (Exception ioe) { + throw new IgniteCheckedException("Failed to load job class [class=" + + HadoopV2Job.class.getName() + ']', ioe); + } } /** @@ -838,7 +857,7 @@ public class HadoopJobTracker extends HadoopComponent { HadoopCounters cntrs = meta.counters(); - writer.write(job.info(), jobId, cntrs); + writer.write(job, cntrs); } } catch (Exception e) { @@ -986,7 +1005,7 @@ public class HadoopJobTracker extends HadoopComponent { jobInfo = meta.jobInfo(); } - job = jobInfo.createJob(jobId, log); + job = jobInfo.createJob(jobCls, jobId, log); job.initialize(false, ctx.localNodeId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java index 040552a..b0b0b8c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.hadoop.shuffle.*; import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.offheap.unsafe.*; @@ -115,7 +116,7 @@ public class HadoopChildProcessRunner { assert job == null; - job = req.jobInfo().createJob(req.jobId(), log); + job = req.jobInfo().createJob(HadoopV2Job.class, req.jobId(), log); job.initialize(true, nodeDesc.processId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index d754039..33a218d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.hadoop.v2; +import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; @@ -30,15 +31,18 @@ import org.apache.ignite.internal.processors.hadoop.v1.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; import org.jsr166.*; import java.io.*; import java.lang.reflect.*; +import java.net.*; import java.util.*; import java.util.Queue; import java.util.concurrent.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.*; /** * Hadoop job implementation for v2 API. @@ -70,7 +74,10 @@ public class HadoopV2Job implements HadoopJob { private final Queue> taskCtxClsPool = new ConcurrentLinkedQueue<>(); /** All created contexts. */ - private final Queue> fullCtxClsQueue = new ConcurrentLinkedDeque<>(); + private final Queue> fullCtxClsQueue = new ConcurrentLinkedDeque<>(); + + /** File system cache map. */ + private final HadoopLazyConcurrentMap fsMap = createHadoopLazyConcurrentMap(); /** Local node ID */ private volatile UUID locNodeId; @@ -103,7 +110,7 @@ public class HadoopV2Job implements HadoopJob { jobCtx = new JobContextImpl(jobConf, hadoopJobID); - rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log); + rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, this); } /** {@inheritDoc} */ @@ -134,7 +141,7 @@ public class HadoopV2Job implements HadoopJob { Path jobDir = new Path(jobDirPath); try { - FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, true); + FileSystem fs = fileSystem(jobDir.toUri(), jobConf); JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, jobDir); @@ -180,6 +187,7 @@ public class HadoopV2Job implements HadoopJob { } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException { T2 locTaskId = new T2<>(info.type(), info.taskNumber()); @@ -201,7 +209,7 @@ public class HadoopV2Job implements HadoopJob { // Note that the classloader identified by the task it was initially created for, // but later it may be reused for other tasks. HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), - "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber()); + HadoopClassLoader.nameForTask(info, false)); cls = (Class)ldr.loadClass(HadoopV2TaskContext.class.getName()); @@ -243,8 +251,13 @@ public class HadoopV2Job implements HadoopJob { /** {@inheritDoc} */ @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException { + assert locNodeId != null; + this.locNodeId = locNodeId; + assert ((HadoopClassLoader)getClass().getClassLoader()).name() + .equals(HadoopClassLoader.nameForJob(this.locNodeId)); + Thread.currentThread().setContextClassLoader(jobConf.getClassLoader()); try { @@ -274,17 +287,26 @@ public class HadoopV2Job implements HadoopJob { // Stop the daemon threads that have been created // with the task class loaders: while (true) { - Class cls = fullCtxClsQueue.poll(); + Class cls = fullCtxClsQueue.poll(); if (cls == null) break; try { - Class daemonCls = cls.getClassLoader().loadClass(HadoopClassLoader.HADOOP_DAEMON_CLASS_NAME); + final ClassLoader ldr = cls.getClassLoader(); - Method m = daemonCls.getMethod("dequeueAndStopAll"); + try { + // Stop Hadoop daemons for this *task*: + stopHadoopFsDaemons(ldr); + } + catch (Exception e) { + if (err == null) + err = e; + } - m.invoke(null); + // Also close all the FileSystems cached in + // HadoopLazyConcurrentMap for this *task* class loader: + closeCachedTaskFileSystems(ldr); } catch (Throwable e) { if (err == null) @@ -297,11 +319,46 @@ public class HadoopV2Job implements HadoopJob { assert fullCtxClsQueue.isEmpty(); + try { + // Close all cached file systems for this *Job*: + fsMap.close(); + } + catch (Exception e) { + if (err == null) + err = e; + } + if (err != null) throw U.cast(err); } } + /** + * Stops Hadoop Fs daemon threads. + * @param ldr The task ClassLoader to stop the daemons for. + * @throws Exception On error. + */ + private void stopHadoopFsDaemons(ClassLoader ldr) throws Exception { + Class daemonCls = ldr.loadClass(HadoopClassLoader.HADOOP_DAEMON_CLASS_NAME); + + Method m = daemonCls.getMethod("dequeueAndStopAll"); + + m.invoke(null); + } + + /** + * Closes all the file systems user by task + * @param ldr The task class loader. + * @throws Exception On error. + */ + private void closeCachedTaskFileSystems(ClassLoader ldr) throws Exception { + Class clazz = ldr.loadClass(HadoopV2TaskContext.class.getName()); + + Method m = clazz.getMethod("close"); + + m.invoke(null); + } + /** {@inheritDoc} */ @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info)); @@ -331,4 +388,15 @@ public class HadoopV2Job implements HadoopJob { public JobConf jobConf() { return jobConf; } + + /** + * Gets file system for this job. + * @param uri The uri. + * @param cfg The configuration. + * @return The file system. + * @throws IOException On error. + */ + public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws IOException { + return fileSystemForMrUserWithCaching(uri, cfg, fsMap); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index 2f64e77..912cc3f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -39,9 +39,10 @@ import java.util.*; * Provides all resources are needed to the job execution. Downloads the main jar, the configuration and additional * files are needed to be placed on local files system. */ -public class HadoopV2JobResourceManager { +class HadoopV2JobResourceManager { /** File type Fs disable caching property name. */ - private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = HadoopUtils.disableFsCachePropertyName("file"); + private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = + HadoopFileSystemsUtils.disableFsCachePropertyName("file"); /** Hadoop job context. */ private final JobContextImpl ctx; @@ -61,16 +62,20 @@ public class HadoopV2JobResourceManager { /** Staging directory to delivery job jar and config to the work nodes. */ private Path stagingDir; + /** The job. */ + private final HadoopV2Job job; + /** * Creates new instance. * @param jobId Job ID. * @param ctx Hadoop job context. * @param log Logger. */ - public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log) { + public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log, HadoopV2Job job) { this.jobId = jobId; this.ctx = ctx; this.log = log.getLogger(HadoopV2JobResourceManager.class); + this.job = job; } /** @@ -115,7 +120,7 @@ public class HadoopV2JobResourceManager { stagingDir = new Path(new URI(mrDir)); if (download) { - FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, true); + FileSystem fs = job.fileSystem(stagingDir.toUri(), cfg); if (!fs.exists(stagingDir)) throw new IgniteCheckedException("Failed to find map-reduce submission " + @@ -210,7 +215,7 @@ public class HadoopV2JobResourceManager { FileSystem dstFs = FileSystem.getLocal(cfg); - FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, true); + FileSystem srcFs = job.fileSystem(srcPath.toUri(), cfg); if (extract) { File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); @@ -292,8 +297,11 @@ public class HadoopV2JobResourceManager { */ public void cleanupStagingDirectory() { try { - if (stagingDir != null) - HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), true).delete(stagingDir, true); + if (stagingDir != null) { + FileSystem fs = job.fileSystem(stagingDir.toUri(), ctx.getJobConf()); + + fs.delete(stagingDir, true); + } } catch (Exception e) { log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index e89feba..6eff475 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -33,6 +33,7 @@ import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.v1.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; @@ -44,6 +45,7 @@ import java.security.*; import java.util.*; import java.util.concurrent.*; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.*; import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; @@ -54,6 +56,22 @@ public class HadoopV2TaskContext extends HadoopTaskContext { /** */ private static final boolean COMBINE_KEY_GROUPING_SUPPORTED; + /** Lazy per-user file system cache used by the Hadoop task. */ + private static final HadoopLazyConcurrentMap fsMap + = createHadoopLazyConcurrentMap(); + + /** + * This method is called with reflection upon Job finish with class loader of each task. + * This will clean up all the Fs created for specific task. + * Each class loader sees uses its own instance of fsMap since the class loaders + * are different. + * + * @throws IgniteCheckedException On error. + */ + public static void close() throws IgniteCheckedException { + fsMap.close(); + } + /** * Check for combiner grouping support (available since Hadoop 2.3). */ @@ -91,7 +109,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { private volatile HadoopTask task; /** Local node ID */ - private UUID locNodeId; + private final UUID locNodeId; /** Counters for task. */ private final HadoopCounters cntrs = new HadoopCountersImpl(); @@ -423,7 +441,22 @@ public class HadoopV2TaskContext extends HadoopTaskContext { private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException { Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR)); - try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), false); + FileSystem fs; + + try { + // This assertion uses .startsWith() instead of .equals() because task class loaders may + // be reused between tasks of the same job. + assert ((HadoopClassLoader)getClass().getClassLoader()).name() + .startsWith(HadoopClassLoader.nameForTask(taskInfo(), true)); + + // We also cache Fs there, all them will be cleared explicitly upon the Job end. + fs = fileSystemForMrUserWithCaching(jobDir.toUri(), jobConf(), fsMap); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + try ( FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) { in.seek(split.offset()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java index fb21e2d..e7c7f8a 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java @@ -21,7 +21,6 @@ import junit.framework.*; import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.hadoop.fs.*; -import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.typedef.*; import org.jetbrains.annotations.*; @@ -38,7 +37,7 @@ public class IgfsEventsTestSuite extends TestSuite { * @throws Exception Thrown in case of the failure. */ public static TestSuite suite() throws Exception { - HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); + ClassLoader ldr = TestSuite.class.getClassLoader(); TestSuite suite = new TestSuite("Ignite FS Events Test Suite"); @@ -58,7 +57,7 @@ public class IgfsEventsTestSuite extends TestSuite { * @throws Exception Thrown in case of the failure. */ public static TestSuite suiteNoarchOnly() throws Exception { - HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); + ClassLoader ldr = TestSuite.class.getClassLoader(); TestSuite suite = new TestSuite("Ignite IGFS Events Test Suite Noarch Only"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java index 66c14b5..da6f9c7 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -272,7 +272,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { @Override public boolean apply() { return igfs.exists(statPath); } - }, 10000); + }, 20_000); final long apiEvtCnt0 = apiEvtCnt; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java index 48e83cc..f59be19 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.mapred.*; import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; import java.io.*; import java.util.*; @@ -44,9 +45,11 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf); - HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); + UUID uuid = new UUID(0, 0); - return jobInfo.createJob(jobId, log); + HadoopJobId jobId = new HadoopJobId(uuid, 0); + + return jobInfo.createJob(HadoopV2Job.class, jobId, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java index e73fae3..1570807 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java @@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; import java.util.*; @@ -62,9 +63,11 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration()); - HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); + UUID uuid = new UUID(0, 0); - return jobInfo.createJob(jobId, log); + HadoopJobId jobId = new HadoopJobId(uuid, 0); + + return jobInfo.createJob(HadoopV2Job.class, jobId, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java index f3b9307..b8f62e6 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -68,9 +68,11 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { HadoopDefaultJobInfo info = createJobInfo(cfg); - HadoopJobId id = new HadoopJobId(UUID.randomUUID(), 1); + final UUID uuid = UUID.randomUUID(); - HadoopJob job = info.createJob(id, log); + HadoopJobId id = new HadoopJobId(uuid, 1); + + HadoopJob job = info.createJob(HadoopV2Job.class, id, log); HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, null)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java index 9395c5e..b5e2ab5 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java @@ -136,7 +136,8 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { + @Override public HadoopJob createJob(Class jobCls, + HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { assert false; return null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 4be5d72..2ab3e8c 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -50,7 +50,7 @@ public class IgniteHadoopTestSuite extends TestSuite { downloadHadoop(); downloadHive(); - HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); + final ClassLoader ldr = TestSuite.class.getClassLoader(); TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3b49184b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java index 8982d83..22beea6 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java @@ -20,7 +20,6 @@ package org.apache.ignite.testsuites; import junit.framework.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.processors.hadoop.*; import static org.apache.ignite.testsuites.IgniteHadoopTestSuite.*; @@ -36,7 +35,7 @@ public class IgniteIgfsLinuxAndMacOSTestSuite extends TestSuite { public static TestSuite suite() throws Exception { downloadHadoop(); - HadoopClassLoader ldr = new HadoopClassLoader(null, "test"); + ClassLoader ldr = TestSuite.class.getClassLoader(); TestSuite suite = new TestSuite("Ignite IGFS Test Suite For Linux And Mac OS");