ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [43/68] [abbrv] ignite git commit: IGNITE-3912: Hadoop: Implemented new class loading architecture for embedded execution mode.
Date Tue, 27 Sep 2016 15:26:33 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
index a01bfaf..89b8028 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
@@ -17,37 +17,26 @@
 
 package org.apache.ignite.hadoop.fs;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
 import org.apache.ignite.hadoop.util.KerberosUserNameMapper;
 import org.apache.ignite.hadoop.util.UserNameMapper;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
 import org.jetbrains.annotations.Nullable;
 
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Arrays;
 
 /**
  * Simple Hadoop file system factory which delegates to {@code FileSystem.get()} on each call.
  * <p>
  * If {@code "fs.[prefix].impl.disable.cache"} is set to {@code true}, file system instances will be cached by Hadoop.
  */
-public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Externalizable, LifecycleAware {
+public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** File system URI. */
+        /** File system URI. */
     private String uri;
 
     /** File system config paths. */
@@ -56,12 +45,6 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
     /** User name mapper. */
     private UserNameMapper usrNameMapper;
 
-    /** Configuration of the secondary filesystem, never null. */
-    protected transient Configuration cfg;
-
-    /** Resulting URI. */
-    protected transient URI fullUri;
-
     /**
      * Constructor.
      */
@@ -70,64 +53,17 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
     }
 
     /** {@inheritDoc} */
-    @Override public final FileSystem get(String name) throws IOException {
-        String name0 = IgfsUtils.fixUserName(name);
-
-        if (usrNameMapper != null)
-            name0 = IgfsUtils.fixUserName(usrNameMapper.map(name0));
-
-        return getWithMappedName(name0);
-    }
-
-    /**
-     * Internal file system create routine.
-     *
-     * @param usrName User name.
-     * @return File system.
-     * @throws IOException If failed.
-     */
-    protected FileSystem getWithMappedName(String usrName) throws IOException {
-        assert cfg != null;
-
-        try {
-            // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation.
-            // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context
-            // classloader to classloader of current class to avoid strange class-cast-exceptions.
-            ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader());
-
-            try {
-                return create(usrName);
-            }
-            finally {
-                HadoopUtils.restoreContextClassLoader(oldLdr);
-            }
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IOException("Failed to create file system due to interrupt.", e);
-        }
-    }
-
-    /**
-     * Internal file system creation routine, invoked in correct class loader context.
-     *
-     * @param usrName User name.
-     * @return File system.
-     * @throws IOException If failed.
-     * @throws InterruptedException if the current thread is interrupted.
-     */
-    protected FileSystem create(String usrName) throws IOException, InterruptedException {
-        return FileSystem.get(fullUri, cfg, usrName);
+    @Override public final Object get(String name) throws IOException {
+        throw new UnsupportedOperationException("Method should not be called directly.");
     }
 
     /**
      * Gets file system URI.
      * <p>
-     * This URI will be used as a first argument when calling {@link FileSystem#get(URI, Configuration, String)}.
+     * This URI will be used as a first argument when calling {@code FileSystem.get(URI, Configuration, String)}.
      * <p>
      * If not set, default URI will be picked from file system configuration using
-     * {@link FileSystem#getDefaultUri(Configuration)} method.
+     * {@code FileSystem.getDefaultUri(Configuration)} method.
      *
      * @return File system URI.
      */
@@ -149,11 +85,8 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
      * <p>
      * Path could be either absolute or relative to {@code IGNITE_HOME} environment variable.
      * <p>
-     * All provided paths will be loaded in the order they provided and then applied to {@link Configuration}. It means
+     * All provided paths will be loaded in the order they provided and then applied to {@code Configuration}. It means
      * that path order might be important in some cases.
-     * <p>
-     * <b>NOTE!</b> Factory can be serialized and transferred to other machines where instance of
-     * {@link IgniteHadoopFileSystem} resides. Corresponding paths must exist on these machines as well.
      *
      * @return Paths to file system configuration files.
      */
@@ -198,50 +131,6 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
     }
 
     /** {@inheritDoc} */
-    @Override public void start() throws IgniteException {
-        cfg = HadoopUtils.safeCreateConfiguration();
-
-        if (cfgPaths != null) {
-            for (String cfgPath : cfgPaths) {
-                if (cfgPath == null)
-                    throw new NullPointerException("Configuration path cannot be null: " + Arrays.toString(cfgPaths));
-                else {
-                    URL url = U.resolveIgniteUrl(cfgPath);
-
-                    if (url == null) {
-                        // If secConfPath is given, it should be resolvable:
-                        throw new IgniteException("Failed to resolve secondary file system configuration path " +
-                            "(ensure that it exists locally and you have read access to it): " + cfgPath);
-                    }
-
-                    cfg.addResource(url);
-                }
-            }
-        }
-
-        // If secondary fs URI is not given explicitly, try to get it from the configuration:
-        if (uri == null)
-            fullUri = FileSystem.getDefaultUri(cfg);
-        else {
-            try {
-                fullUri = new URI(uri);
-            }
-            catch (URISyntaxException use) {
-                throw new IgniteException("Failed to resolve secondary file system URI: " + uri);
-            }
-        }
-
-        if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
-            ((LifecycleAware)usrNameMapper).start();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop() throws IgniteException {
-        if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
-            ((LifecycleAware)usrNameMapper).stop();
-    }
-
-    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeString(out, uri);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
index bcbb082..b90777c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/CachingHadoopFileSystemFactory.java
@@ -17,24 +17,14 @@
 
 package org.apache.ignite.hadoop.fs;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
-
-import java.io.IOException;
-import java.net.URI;
-
 /**
- * Caching Hadoop file system factory. Caches {@link FileSystem} instances on per-user basis. Doesn't rely on
+ * Caching Hadoop file system factory. Caches {@code FileSystem} instances on per-user basis. Doesn't rely on
  * built-in Hadoop {@code FileSystem} caching mechanics. Separate {@code FileSystem} instance is created for each
  * user instead.
  * <p>
- * This makes cache instance resistant to concurrent calls to {@link FileSystem#close()} in other parts of the user
+ * This makes cache instance resistant to concurrent calls to {@code FileSystem.close()} in other parts of the user
  * code. On the other hand, this might cause problems on some environments. E.g. if Kerberos is enabled, a call to
- * {@link FileSystem#get(URI, Configuration, String)} will refresh Kerberos token. But this factory implementation
+ * {@code FileSystem.get(URI, Configuration, String)} will refresh Kerberos token. But this factory implementation
  * calls this method only once per user what may lead to token expiration. In such cases it makes sense to either
  * use {@link BasicHadoopFileSystemFactory} or implement your own factory.
  */
@@ -42,44 +32,10 @@ public class CachingHadoopFileSystemFactory extends BasicHadoopFileSystemFactory
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Per-user file system cache. */
-    private final transient HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>(
-        new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
-            @Override public FileSystem createValue(String key) throws IOException {
-                return CachingHadoopFileSystemFactory.super.getWithMappedName(key);
-            }
-        }
-    );
-
     /**
-     * Public non-arg constructor.
+     * Constructor.
      */
     public CachingHadoopFileSystemFactory() {
-        // noop
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileSystem getWithMappedName(String name) throws IOException {
-        return cache.getOrCreate(name);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteException {
-        super.start();
-
-        // Disable caching.
-        cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop() throws IgniteException {
-        super.stop();
-
-        try {
-            cache.close();
-        }
-        catch (IgniteCheckedException ice) {
-            throw new IgniteException(ice);
-        }
+        // No-op.
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
index 5ad08ab..214328f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/HadoopFileSystemFactory.java
@@ -17,16 +17,13 @@
 
 package org.apache.ignite.hadoop.fs;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.igfs.IgfsMode;
 import org.apache.ignite.lifecycle.LifecycleAware;
 
 import java.io.IOException;
 import java.io.Serializable;
 
 /**
- * Factory for Hadoop {@link FileSystem} used by {@link IgniteHadoopIgfsSecondaryFileSystem}.
+ * Factory for Hadoop {@code FileSystem} used by {@link IgniteHadoopIgfsSecondaryFileSystem}.
  * <p>
  * {@link #get(String)} method will be used whenever a call to a target {@code FileSystem} is required.
  * <p>
@@ -35,10 +32,6 @@ import java.io.Serializable;
  * <p>
  * Concrete factory may implement {@link LifecycleAware} interface. In this case start and stop callbacks will be
  * performed by Ignite. You may want to implement some initialization or cleanup there.
- * <p>
- * Note that factory extends {@link Serializable} interface as it might be necessary to transfer factories over the
- * wire to {@link IgniteHadoopFileSystem} if {@link IgfsMode#PROXY} is enabled for some file
- * system paths.
  */
 public interface HadoopFileSystemFactory extends Serializable {
     /**
@@ -48,5 +41,5 @@ public interface HadoopFileSystemFactory extends Serializable {
      * @return File system.
      * @throws IOException In case of error.
      */
-    public FileSystem get(String usrName) throws IOException;
+    public Object get(String usrName) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 8085826..f1c1b16 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -17,25 +17,12 @@
 
 package org.apache.ignite.hadoop.fs;
 
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemCounterWriterDelegate;
 
 /**
  * Statistic writer implementation that writes info into any Hadoop file system.
@@ -47,57 +34,39 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
     /** */
     public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory";
 
-    /** */
-    private static final String USER_MACRO = "${USER}";
+    /** Mutex. */
+    private final Object mux = new Object();
 
-    /** */
-    private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO;
+    /** Delegate. */
+    private volatile HadoopFileSystemCounterWriterDelegate delegate;
 
     /** {@inheritDoc} */
     @Override public void write(HadoopJob job, HadoopCounters cntrs)
         throws IgniteCheckedException {
+        delegate(job).write(job, cntrs);
+    }
 
-        Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
-
-        final HadoopJobInfo jobInfo = job.info();
-
-        final HadoopJobId jobId = job.id();
-
-        for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
-            hadoopCfg.set(e.getKey(), e.getValue());
-
-        String user = jobInfo.user();
-
-        user = IgfsUtils.fixUserName(user);
-
-        String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY);
-
-        if (dir == null)
-            dir = DEFAULT_COUNTER_WRITER_DIR;
-
-        Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString());
-
-        HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
-
-        try {
-            hadoopCfg.set(MRJobConfig.USER_NAME, user);
+    /**
+     * Get delegate creating it if needed.
+     *
+     * @param job Job.
+     * @return Delegate.
+     */
+    private HadoopFileSystemCounterWriterDelegate delegate(HadoopJob job) {
+        HadoopFileSystemCounterWriterDelegate delegate0 = delegate;
 
-            FileSystem fs = ((HadoopV2Job)job).fileSystem(jobStatPath.toUri(), hadoopCfg);
+        if (delegate0 == null) {
+            synchronized (mux) {
+                delegate0 = delegate;
 
-            fs.mkdirs(jobStatPath);
+                if (delegate0 == null) {
+                    delegate0 = HadoopDelegateUtils.counterWriterDelegate(job.getClass().getClassLoader(), this);
 
-            try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) {
-                for (T2<String, Long> evt : perfCntr.evts()) {
-                    out.print(evt.get1());
-                    out.print(':');
-                    out.println(evt.get2().toString());
+                    delegate = delegate0;
                 }
-
-                out.flush();
             }
         }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
+
+        return delegate0;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index 6b5c776..c9d08c5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -17,62 +17,48 @@
 
 package org.apache.ignite.hadoop.fs;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathExistsException;
-import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteFileSystem;
-import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
-import org.apache.ignite.igfs.IgfsException;
 import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsParentNotDirectoryException;
 import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathAlreadyExistsException;
-import org.apache.ignite.igfs.IgfsPathNotFoundException;
 import org.apache.ignite.igfs.IgfsUserContext;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProperties;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
-import org.apache.ignite.internal.processors.igfs.IgfsFileImpl;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopIgfsSecondaryFileSystemDelegate;
+import org.apache.ignite.internal.processors.igfs.IgfsKernalContextAware;
 import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemV2;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.lifecycle.LifecycleAware;
 import org.jetbrains.annotations.Nullable;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.io.OutputStream;
-import java.net.URI;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
 /**
- * Secondary file system which delegates calls to an instance of Hadoop {@link FileSystem}.
+ * Secondary file system which delegates calls to Hadoop {@code org.apache.hadoop.fs.FileSystem}.
  * <p>
  * Target {@code FileSystem}'s are created on per-user basis using passed {@link HadoopFileSystemFactory}.
  */
-public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystemV2, LifecycleAware,
-    HadoopPayloadAware {
+public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystemV2, IgfsKernalContextAware,
+    LifecycleAware, HadoopPayloadAware {
     /** The default user name. It is used if no user context is set. */
     private String dfltUsrName;
 
     /** Factory. */
-    private HadoopFileSystemFactory fsFactory;
+    private HadoopFileSystemFactory factory;
+
+    /** Kernal context. */
+    private volatile GridKernalContext ctx;
+
+    /** Target. */
+    private volatile HadoopIgfsSecondaryFileSystemDelegate target;
 
     /**
      * Default constructor for Spring.
@@ -135,7 +121,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
      * Gets default user name.
      * <p>
      * Defines user name which will be used during file system invocation in case no user name is defined explicitly
-     * through {@link FileSystem#get(URI, Configuration, String)}.
+     * through {@code FileSystem.get(URI, Configuration, String)}.
      * <p>
      * Also this name will be used if you manipulate {@link IgniteFileSystem} directly and do not set user name
      * explicitly using {@link IgfsUserContext#doAs(String, IgniteOutClosure)} or
@@ -162,14 +148,14 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /**
      * Gets secondary file system factory.
      * <p>
-     * This factory will be used whenever a call to a target {@link FileSystem} is required.
+     * This factory will be used whenever a call to a target {@code FileSystem} is required.
      * <p>
      * If not set, {@link CachingHadoopFileSystemFactory} will be used.
      *
      * @return Secondary file system factory.
      */
     public HadoopFileSystemFactory getFileSystemFactory() {
-        return fsFactory;
+        return factory;
     }
 
     /**
@@ -178,403 +164,115 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
      * @param factory Secondary file system factory.
      */
     public void setFileSystemFactory(HadoopFileSystemFactory factory) {
-        this.fsFactory = factory;
-    }
-
-    /**
-     * Convert IGFS path into Hadoop path.
-     *
-     * @param path IGFS path.
-     * @return Hadoop path.
-     */
-    private Path convert(IgfsPath path) {
-        URI uri = fileSystemForUser().getUri();
-
-        return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
-    }
-
-    /**
-     * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
-     *
-     * @param e Exception to check.
-     * @param detailMsg Detailed error message.
-     * @return Appropriate exception.
-     */
-    private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
-        return cast(detailMsg, e);
-    }
-
-    /**
-     * Cast IO exception to IGFS exception.
-     *
-     * @param e IO exception.
-     * @return IGFS exception.
-     */
-    public static IgfsException cast(String msg, IOException e) {
-        if (e instanceof FileNotFoundException)
-            return new IgfsPathNotFoundException(e);
-        else if (e instanceof ParentNotDirectoryException)
-            return new IgfsParentNotDirectoryException(msg, e);
-        else if (e instanceof PathIsNotEmptyDirectoryException)
-            return new IgfsDirectoryNotEmptyException(e);
-        else if (e instanceof PathExistsException)
-            return new IgfsPathAlreadyExistsException(msg, e);
-        else
-            return new IgfsException(msg, e);
-    }
-
-    /**
-     * Convert Hadoop FileStatus properties to map.
-     *
-     * @param status File status.
-     * @return IGFS attributes.
-     */
-    private static Map<String, String> properties(FileStatus status) {
-        FsPermission perm = status.getPermission();
-
-        if (perm == null)
-            perm = FsPermission.getDefault();
-
-        HashMap<String, String> res = new HashMap<>(3);
-
-        res.put(IgfsUtils.PROP_PERMISSION, String.format("%04o", perm.toShort()));
-        res.put(IgfsUtils.PROP_USER_NAME, status.getOwner());
-        res.put(IgfsUtils.PROP_GROUP_NAME, status.getGroup());
-
-        return res;
+        this.factory = factory;
     }
 
     /** {@inheritDoc} */
     @Override public boolean exists(IgfsPath path) {
-        try {
-            return fileSystemForUser().exists(convert(path));
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
-        }
+        return target.exists(path);
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
-        HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
-
-        final FileSystem fileSys = fileSystemForUser();
-
-        try {
-            if (props0.userName() != null || props0.groupName() != null)
-                fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
-
-            if (props0.permission() != null)
-                fileSys.setPermission(convert(path), props0.permission());
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]");
-        }
-
-        //Result is not used in case of secondary FS.
-        return null;
+        return target.update(path, props);
     }
 
     /** {@inheritDoc} */
     @Override public void rename(IgfsPath src, IgfsPath dest) {
-        // Delegate to the secondary file system.
-        try {
-            if (!fileSystemForUser().rename(convert(src), convert(dest)))
-                throw new IgfsException("Failed to rename (secondary file system returned false) " +
-                    "[src=" + src + ", dest=" + dest + ']');
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']');
-        }
+        target.rename(src, dest);
     }
 
     /** {@inheritDoc} */
     @Override public boolean delete(IgfsPath path, boolean recursive) {
-        try {
-            return fileSystemForUser().delete(convert(path), recursive);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
-        }
+        return target.delete(path, recursive);
     }
 
     /** {@inheritDoc} */
     @Override public void mkdirs(IgfsPath path) {
-        try {
-            if (!fileSystemForUser().mkdirs(convert(path)))
-                throw new IgniteException("Failed to make directories [path=" + path + "]");
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]");
-        }
+        target.mkdirs(path);
     }
 
     /** {@inheritDoc} */
     @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
-        try {
-            if (!fileSystemForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
-                throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]");
-        }
+        target.mkdirs(path, props);
     }
 
     /** {@inheritDoc} */
     @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
-        try {
-            FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
-
-            if (statuses == null)
-                throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
-
-            Collection<IgfsPath> res = new ArrayList<>(statuses.length);
-
-            for (FileStatus status : statuses)
-                res.add(new IgfsPath(path, status.getPath().getName()));
-
-            return res;
-        }
-        catch (FileNotFoundException ignored) {
-            throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
-        }
+        return target.listPaths(path);
     }
 
     /** {@inheritDoc} */
     @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
-        try {
-            FileStatus[] statuses = fileSystemForUser().listStatus(convert(path));
-
-            if (statuses == null)
-                throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
-
-            Collection<IgfsFile> res = new ArrayList<>(statuses.length);
-
-            for (FileStatus s : statuses) {
-                IgfsEntryInfo fsInfo = s.isDirectory() ?
-                    IgfsUtils.createDirectory(
-                        IgniteUuid.randomUuid(),
-                        null,
-                        properties(s),
-                        s.getAccessTime(),
-                        s.getModificationTime()
-                    ) :
-                    IgfsUtils.createFile(
-                        IgniteUuid.randomUuid(),
-                        (int)s.getBlockSize(),
-                        s.getLen(),
-                        null,
-                        null,
-                        false,
-                        properties(s),
-                        s.getAccessTime(),
-                        s.getModificationTime()
-                    );
-
-                res.add(new IgfsFileImpl(new IgfsPath(path, s.getPath().getName()), fsInfo, 1));
-            }
-
-            return res;
-        }
-        catch (FileNotFoundException ignored) {
-            throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path);
-        }
+        return target.listFiles(path);
     }
 
     /** {@inheritDoc} */
     @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) {
-        return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSystemForUser(), convert(path), bufSize);
+        return target.open(path, bufSize);
     }
 
     /** {@inheritDoc} */
     @Override public OutputStream create(IgfsPath path, boolean overwrite) {
-        try {
-            return fileSystemForUser().create(convert(path), overwrite);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
-        }
+        return target.create(path, overwrite);
     }
 
     /** {@inheritDoc} */
     @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
         long blockSize, @Nullable Map<String, String> props) {
-        HadoopIgfsProperties props0 =
-            new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap());
-
-        try {
-            return fileSystemForUser().create(convert(path), props0.permission(), overwrite, bufSize,
-                (short) replication, blockSize, null);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
-                ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication +
-                ", blockSize=" + blockSize + "]");
-        }
+        return target.create(path, bufSize, overwrite, replication, blockSize, props);
     }
 
     /** {@inheritDoc} */
     @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
         @Nullable Map<String, String> props) {
-        try {
-            return fileSystemForUser().append(convert(path), bufSize);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
-        }
+        return target.append(path, bufSize, create, props);
     }
 
     /** {@inheritDoc} */
     @Override public IgfsFile info(final IgfsPath path) {
-        try {
-            final FileStatus status = fileSystemForUser().getFileStatus(convert(path));
-
-            if (status == null)
-                return null;
-
-            final Map<String, String> props = properties(status);
-
-            return new IgfsFile() {
-                @Override public IgfsPath path() {
-                    return path;
-                }
-
-                @Override public boolean isFile() {
-                    return status.isFile();
-                }
-
-                @Override public boolean isDirectory() {
-                    return status.isDirectory();
-                }
-
-                @Override public int blockSize() {
-                    // By convention directory has blockSize == 0, while file has blockSize > 0:
-                    return isDirectory() ? 0 : (int)status.getBlockSize();
-                }
-
-                @Override public long groupBlockSize() {
-                    return status.getBlockSize();
-                }
-
-                @Override public long accessTime() {
-                    return status.getAccessTime();
-                }
-
-                @Override public long modificationTime() {
-                    return status.getModificationTime();
-                }
-
-                @Override public String property(String name) throws IllegalArgumentException {
-                    String val = props.get(name);
-
-                    if (val ==  null)
-                        throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']');
-
-                    return val;
-                }
-
-                @Nullable @Override public String property(String name, @Nullable String dfltVal) {
-                    String val = props.get(name);
-
-                    return val == null ? dfltVal : val;
-                }
-
-                @Override public long length() {
-                    return status.getLen();
-                }
-
-                /** {@inheritDoc} */
-                @Override public Map<String, String> properties() {
-                    return props;
-                }
-            };
-        }
-        catch (FileNotFoundException ignore) {
-            return null;
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]");
-        }
+        return target.info(path);
     }
 
     /** {@inheritDoc} */
     @Override public long usedSpaceSize() {
-        try {
-            // We don't use FileSystem#getUsed() since it counts only the files
-            // in the filesystem root, not all the files recursively.
-            return fileSystemForUser().getContentSummary(new Path("/")).getSpaceConsumed();
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
-        }
+        return target.usedSpaceSize();
     }
 
     /** {@inheritDoc} */
     @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
-        try {
-            // We don't use FileSystem#getUsed() since it counts only the files
-            // in the filesystem root, not all the files recursively.
-            fileSystemForUser().setTimes(convert(path), modificationTime, accessTime);
-        }
-        catch (IOException e) {
-            throw handleSecondaryFsError(e, "Failed set times for path: " + path);
-        }
+        target.setTimes(path, accessTime, modificationTime);
     }
 
-    /**
-     * Gets the underlying {@link FileSystem}.
-     * This method is used solely for testing.
-     * @return the underlying Hadoop {@link FileSystem}.
-     */
-    public FileSystem fileSystem() {
-        return fileSystemForUser();
+    /** {@inheritDoc} */
+    @Override public void setKernalContext(GridKernalContext ctx) {
+        this.ctx = ctx;
     }
 
-    /**
-     * Gets the FileSystem for the current context user.
-     * @return the FileSystem instance, never null.
-     */
-    private FileSystem fileSystemForUser() {
-        String user = IgfsUserContext.currentUser();
-
-        if (F.isEmpty(user))
-            user = IgfsUtils.fixUserName(dfltUsrName);
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        HadoopClassLoader ldr = ctx.hadoopHelper().commonClassLoader();
 
-        assert !F.isEmpty(user);
+        ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(ldr);
 
         try {
-            return fsFactory.get(user);
+            target = HadoopDelegateUtils.secondaryFileSystemDelegate(ldr, this);
+
+            target.start();
         }
-        catch (IOException ioe) {
-            throw new IgniteException(ioe);
+        finally {
+            HadoopCommonUtils.restoreContextClassLoader(oldLdr);
         }
     }
 
     /** {@inheritDoc} */
-    @Override public void start() throws IgniteException {
-        dfltUsrName = IgfsUtils.fixUserName(dfltUsrName);
-
-        if (fsFactory == null)
-            fsFactory = new CachingHadoopFileSystemFactory();
-
-        if (fsFactory instanceof LifecycleAware)
-            ((LifecycleAware) fsFactory).start();
-    }
-
-    /** {@inheritDoc} */
     @Override public void stop() throws IgniteException {
-        if (fsFactory instanceof LifecycleAware)
-            ((LifecycleAware)fsFactory).stop();
+        if (target != null)
+            target.stop();
     }
 
     /** {@inheritDoc} */
     @Override public HadoopFileSystemFactory getPayload() {
-        return fsFactory;
+        return factory;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
index bbfbc59..46d626b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/KerberosHadoopFileSystemFactory.java
@@ -17,19 +17,12 @@
 
 package org.apache.ignite.hadoop.fs;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.security.PrivilegedExceptionAction;
 
 /**
  * Secure Hadoop file system factory that can work with underlying file system protected with Kerberos.
@@ -57,9 +50,6 @@ public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactor
     /** The re-login interval. See {@link #getReloginInterval()} for more information. */
     private long reloginInterval = DFLT_RELOGIN_INTERVAL;
 
-    /** Time of last re-login attempt, in system milliseconds. */
-    private transient volatile long lastReloginTime;
-
     /**
      * Constructor.
      */
@@ -67,25 +57,6 @@ public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactor
         // No-op.
     }
 
-    /** {@inheritDoc} */
-    @Override public FileSystem getWithMappedName(String name) throws IOException {
-        reloginIfNeeded();
-
-        return super.getWithMappedName(name);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected FileSystem create(String usrName) throws IOException, InterruptedException {
-        UserGroupInformation proxyUgi = UserGroupInformation.createProxyUser(usrName,
-            UserGroupInformation.getLoginUser());
-
-        return proxyUgi.doAs(new PrivilegedExceptionAction<FileSystem>() {
-            @Override public FileSystem run() throws Exception {
-                return FileSystem.get(fullUri, cfg);
-            }
-        });
-    }
-
     /**
      * Gets the key tab principal short name (e.g. "hdfs").
      *
@@ -106,9 +77,6 @@ public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactor
 
     /**
      * Gets the key tab full file name (e.g. "/etc/security/keytabs/hdfs.headless.keytab" or "/etc/krb5.keytab").
-     * <p>
-     * <b>NOTE!</b> Factory can be serialized and transferred to other machines where instance of
-     * {@link IgniteHadoopFileSystem} resides. Corresponding path must exist on these machines as well.
      *
      * @return The key tab file name.
      */
@@ -136,10 +104,8 @@ public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactor
      * Negative values are not allowed.
      *
      * <p>Note, however, that it does not make sense to make this value small, because Hadoop does not allow to
-     * login if less than {@link org.apache.hadoop.security.UserGroupInformation#MIN_TIME_BEFORE_RELOGIN} milliseconds
+     * login if less than {@code org.apache.hadoop.security.UserGroupInformation.MIN_TIME_BEFORE_RELOGIN} milliseconds
      * have passed since the time of the previous login.
-     * See {@link org.apache.hadoop.security.UserGroupInformation#hasSufficientTimeElapsed(long)} and its usages for
-     * more detail.
      *
      * @return The re-login interval, in milliseconds.
      */
@@ -157,47 +123,6 @@ public class KerberosHadoopFileSystemFactory extends BasicHadoopFileSystemFactor
     }
 
     /** {@inheritDoc} */
-    @Override public void start() throws IgniteException {
-        A.ensure(!F.isEmpty(keyTab), "keyTab cannot not be empty.");
-        A.ensure(!F.isEmpty(keyTabPrincipal), "keyTabPrincipal cannot not be empty.");
-        A.ensure(reloginInterval >= 0, "reloginInterval cannot not be negative.");
-
-        super.start();
-
-        try {
-            UserGroupInformation.setConfiguration(cfg);
-            UserGroupInformation.loginUserFromKeytab(keyTabPrincipal, keyTab);
-        }
-        catch (IOException ioe) {
-            throw new IgniteException("Failed login from keytab [keyTab=" + keyTab +
-                ", keyTabPrincipal=" + keyTabPrincipal + ']', ioe);
-        }
-    }
-
-    /**
-     * Re-logins the user if needed.
-     * First, the re-login interval defined in factory is checked. The re-login attempts will be not more
-     * frequent than one attempt per {@code reloginInterval}.
-     * Second, {@link UserGroupInformation#checkTGTAndReloginFromKeytab()} method invoked that gets existing
-     * TGT and checks its validity. If the TGT is expired or is close to expiry, it performs re-login.
-     *
-     * <p>This operation expected to be called upon each operation with the file system created with the factory.
-     * As long as {@link #get(String)} operation is invoked upon each file {@link IgniteHadoopFileSystem}, there
-     * is no need to invoke it otherwise specially.
-     *
-     * @throws IOException If login fails.
-     */
-    private void reloginIfNeeded() throws IOException {
-        long now = System.currentTimeMillis();
-
-        if (now >= lastReloginTime + reloginInterval) {
-            UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
-
-            lastReloginTime = now;
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         super.writeExternal(out);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index a06129e..7133c08 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -42,12 +42,14 @@ import org.apache.ignite.igfs.IgfsMode;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.IgfsPathSummary;
 import org.apache.ignite.internal.igfs.common.IgfsLogger;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyOutputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsStreamDelegate;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsWrapper;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsInputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsOutputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProxyInputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProxyOutputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsStreamDelegate;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsWrapper;
 import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
 import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
 import org.apache.ignite.internal.processors.igfs.IgfsPaths;
@@ -58,7 +60,6 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
 import org.jetbrains.annotations.Nullable;
 
 import java.io.BufferedOutputStream;
@@ -78,13 +79,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
 import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
 import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_DIR;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_DIR;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
+import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.parameter;
 import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME;
 
 /**
@@ -165,7 +166,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
     private IgfsModeResolver modeRslvr;
 
     /** The secondary file system factory. */
-    private HadoopFileSystemFactory factory;
+    private HadoopFileSystemFactoryDelegate factory;
 
     /** Management connection flag. */
     private boolean mgmt;
@@ -332,7 +333,10 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             if (initSecondary) {
                 try {
-                    factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+                    HadoopFileSystemFactory factory0 =
+                        (HadoopFileSystemFactory)paths.getPayload(getClass().getClassLoader());
+
+                    factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0);
                 }
                 catch (IgniteCheckedException e) {
                     throw new IOException("Failed to get secondary file system factory.", e);
@@ -343,11 +347,10 @@ public class IgniteHadoopFileSystem extends FileSystem {
                         IgniteHadoopIgfsSecondaryFileSystem.class.getName() + " as \"secondaryFIleSystem\" in " +
                         FileSystemConfiguration.class.getName() + "?)");
 
-                if (factory instanceof LifecycleAware)
-                    ((LifecycleAware) factory).start();
+                factory.start();
 
                 try {
-                    FileSystem secFs = factory.get(user);
+                    FileSystem secFs = (FileSystem)factory.get(user);
 
                     secondaryUri = secFs.getUri();
 
@@ -423,8 +426,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
         if (clientLog.isLogEnabled())
             clientLog.close();
 
-        if (factory instanceof LifecycleAware)
-            ((LifecycleAware) factory).stop();
+        if (factory != null)
+            factory.stop();
 
         // Reset initialized resources.
         uri = null;
@@ -1359,6 +1362,6 @@ public class IgniteHadoopFileSystem extends FileSystem {
         if (factory == null)
             return null;
 
-        return factory.get(user);
+        return (FileSystem)factory.get(user);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index bd8ed2d..18b8bf9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -46,13 +46,15 @@ import org.apache.ignite.igfs.IgfsFile;
 import org.apache.ignite.igfs.IgfsMode;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.igfs.common.IgfsLogger;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyInputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsProxyOutputStream;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsStreamDelegate;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsWrapper;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsInputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsOutputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProxyInputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProxyOutputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsStreamDelegate;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsWrapper;
 import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
 import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
 import org.apache.ignite.internal.processors.igfs.IgfsPaths;
@@ -63,7 +65,6 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
 import org.jetbrains.annotations.Nullable;
 
 import java.io.BufferedOutputStream;
@@ -86,13 +87,13 @@ import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_
 import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
 import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser;
 import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_DIR;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
-import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_DIR;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
+import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.parameter;
 import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME;
 
 /**
@@ -169,7 +170,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
     private IgfsModeResolver modeRslvr;
 
     /** The secondary file system factory. */
-    private HadoopFileSystemFactory factory;
+    private HadoopFileSystemFactoryDelegate factory;
 
     /** Whether custom sequential reads before prefetch value is provided. */
     private boolean seqReadsBeforePrefetchOverride;
@@ -341,7 +342,10 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
             if (initSecondary) {
                 try {
-                    factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+                    HadoopFileSystemFactory factory0 =
+                        (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+
+                    factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0);
                 }
                 catch (IgniteCheckedException e) {
                     throw new IOException("Failed to get secondary file system factory.", e);
@@ -354,11 +358,10 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
                 assert factory != null;
 
-                if (factory instanceof LifecycleAware)
-                    ((LifecycleAware) factory).start();
+                factory.start();
 
                 try {
-                    FileSystem secFs = factory.get(user);
+                    FileSystem secFs = (FileSystem)factory.get(user);
 
                     secondaryUri = secFs.getUri();
 
@@ -385,8 +388,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             if (clientLog.isLogEnabled())
                 clientLog.close();
 
-            if (factory instanceof LifecycleAware)
-                ((LifecycleAware) factory).stop();
+            if (factory != null)
+                factory.stop();
 
             // Reset initialized resources.
             rmtClient = null;
@@ -1071,6 +1074,6 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
     private FileSystem secondaryFileSystem() throws IOException{
         assert factory != null;
 
-        return factory.get(user);
+        return (FileSystem)factory.get(user);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
index 583af35..343b5ed 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
@@ -17,10 +17,6 @@
 
 package org.apache.ignite.hadoop.mapreduce;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
@@ -32,10 +28,15 @@ import org.apache.ignite.internal.client.GridClientConfiguration;
 import org.apache.ignite.internal.client.GridClientException;
 import org.apache.ignite.internal.client.GridClientFactory;
 import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
-import org.apache.ignite.internal.processors.hadoop.proto.HadoopClientProtocol;
+import org.apache.ignite.internal.processors.hadoop.impl.proto.HadoopClientProtocol;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+
 import static org.apache.ignite.internal.client.GridClientProtocol.TCP;
 
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
index d4a44fa..e1101c5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
@@ -17,16 +17,6 @@
 
 package org.apache.ignite.hadoop.mapreduce;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.UUID;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -38,13 +28,23 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
-import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
 import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
 import org.apache.ignite.internal.processors.igfs.IgfsEx;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.UUID;
+
 import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME;
 
 /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
index 27ffc19..2d1ac0b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -24,11 +24,11 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
 import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
 import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
 import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
@@ -116,7 +116,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
     /** {@inheritDoc} */
     @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> nodes,
         @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
-        List<HadoopInputSplit> splits = HadoopUtils.sortInputSplits(job.input());
+        List<HadoopInputSplit> splits = HadoopCommonUtils.sortInputSplits(job.input());
         int reducerCnt = job.info().reducers();
 
         if (reducerCnt < 0)

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
index 26dc4b2..12669aa 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/UserNameMapper.java
@@ -17,14 +17,12 @@
 
 package org.apache.ignite.hadoop.util;
 
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
 import org.jetbrains.annotations.Nullable;
 
 import java.io.Serializable;
 
 /**
- * Hadoop file system name mapper. Used by {@link HadoopFileSystemFactory} implementation to pass proper user names
- * to the underlying Hadoop file system.
+ * Hadoop file system name mapper. Ensures that correct user name is passed to the underlying Hadoop file system.
  */
 public interface UserNameMapper extends Serializable {
     /**


Mime
View raw message