ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [46/50] [abbrv] ignite git commit: IGNITE-2206: Hadoop file system creation is now abstracted out using factory interface.
Date Mon, 11 Jan 2016 10:13:17 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index 99ca1ec..0d7de86 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -17,22 +17,6 @@
 
 package org.apache.ignite.hadoop.fs.v2;
 
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,6 +27,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.InvalidPathException;
@@ -51,13 +36,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
 import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsFile;
 import org.apache.ignite.igfs.IgfsMode;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.igfs.common.IgfsLogger;
-import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsInputStream;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsOutputStream;
@@ -74,8 +60,26 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
 import org.jetbrains.annotations.Nullable;
 
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
 import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
 import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser;
@@ -92,8 +96,6 @@ import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_GROUP_NAME;
 import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PERMISSION;
 import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_PREFER_LOCAL_WRITES;
 import static org.apache.ignite.internal.processors.igfs.IgfsEx.PROP_USER_NAME;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_CONFIG_PATH;
-import static org.apache.ignite.internal.processors.igfs.IgfsEx.SECONDARY_FS_URI;
 
 /**
  * {@code IGFS} Hadoop 2.x file system driver over file system API. To use
@@ -168,8 +170,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
     /** Mode resolver. */
     private IgfsModeResolver modeRslvr;
 
-    /** Secondary file system instance. */
-    private AbstractFileSystem secondaryFs;
+    /** The secondary file system factory. */
+    private HadoopFileSystemFactory factory;
 
     /** Whether custom sequential reads before prefetch value is provided. */
     private boolean seqReadsBeforePrefetchOverride;
@@ -335,20 +337,27 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             }
 
             if (initSecondary) {
-                Map<String, String> props = paths.properties();
+                try {
+                    factory = (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IOException("Failed to get secondary file system factory.", e);
+                }
 
-                String secUri = props.get(SECONDARY_FS_URI);
-                String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
+                assert factory != null;
+
+                if (factory instanceof LifecycleAware)
+                    ((LifecycleAware) factory).start();
 
                 try {
-                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
+                    FileSystem secFs = factory.get(user);
 
-                    secondaryFs = secProvider.createAbstractFileSystem(user);
+                    secondaryUri = secFs.getUri();
 
-                    secondaryUri = secProvider.uri();
+                    A.ensure(secondaryUri != null, "Secondary file system uri should not be null.");
                 }
                 catch (IOException e) {
-                    throw new IOException("Failed to connect to the secondary file system: " + secUri, e);
+                    throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e);
                 }
             }
         }
@@ -368,6 +377,9 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             if (clientLog.isLogEnabled())
                 clientLog.close();
 
+            if (factory instanceof LifecycleAware)
+                ((LifecycleAware) factory).stop();
+
             // Reset initialized resources.
             rmtClient = null;
         }
@@ -391,13 +403,13 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
     /** {@inheritDoc} */
     @Override public boolean setReplication(Path f, short replication) throws IOException {
-        return mode(f) == PROXY && secondaryFs.setReplication(f, replication);
+        return mode(f) == PROXY && secondaryFileSystem().setReplication(f, replication);
     }
 
     /** {@inheritDoc} */
     @Override public void setTimes(Path f, long mtime, long atime) throws IOException {
         if (mode(f) == PROXY)
-            secondaryFs.setTimes(f, mtime, atime);
+            secondaryFileSystem().setTimes(f, mtime, atime);
         else {
             if (mtime == -1 && atime == -1)
                 return;
@@ -421,7 +433,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             A.notNull(p, "p");
 
             if (mode(p) == PROXY)
-                secondaryFs.setPermission(toSecondary(p), perm);
+                secondaryFileSystem().setPermission(toSecondary(p), perm);
             else {
                 if (rmtClient.update(convert(p), permission(perm)) == null)
                     throw new IOException("Failed to set file permission (file not found?)" +
@@ -443,7 +455,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
         try {
             if (mode(p) == PROXY)
-                secondaryFs.setOwner(toSecondary(p), usr, grp);
+                secondaryFileSystem().setOwner(toSecondary(p), usr, grp);
             else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, usr, PROP_GROUP_NAME, grp)) == null)
                 throw new IOException("Failed to set file permission (file not found?)" +
                     " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']');
@@ -464,11 +476,11 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             IgfsMode mode = modeRslvr.resolveMode(path);
 
             if (mode == PROXY) {
-                FSDataInputStream is = secondaryFs.open(toSecondary(f), bufSize);
+                FSDataInputStream is = secondaryFileSystem().open(toSecondary(f), bufSize);
 
                 if (clientLog.isLogEnabled()) {
                     // At this point we do not know file size, so we perform additional request to remote FS to get it.
-                    FileStatus status = secondaryFs.getFileStatus(toSecondary(f));
+                    FileStatus status = secondaryFileSystem().getFileStatus(toSecondary(f));
 
                     long size = status != null ? status.getLen() : -1;
 
@@ -543,8 +555,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
                     path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']');
 
             if (mode == PROXY) {
-                FSDataOutputStream os = secondaryFs.createInternal(toSecondary(f), flag, perm, bufSize,
-                    replication, blockSize, progress, checksumOpt, createParent);
+                FSDataOutputStream os = secondaryFileSystem().create(toSecondary(f), perm, flag, bufSize,
+                    replication, blockSize, progress);
 
                 if (clientLog.isLogEnabled()) {
                     long logId = IgfsLogger.nextId();
@@ -641,7 +653,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
                 if (clientLog.isLogEnabled())
                     clientLog.logRename(srcPath, PROXY, dstPath);
 
-                secondaryFs.renameInternal(toSecondary(src), toSecondary(dst));
+                secondaryFileSystem().rename(toSecondary(src), toSecondary(dst));
             }
             else {
                 if (clientLog.isLogEnabled())
@@ -671,7 +683,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
                 if (clientLog.isLogEnabled())
                     clientLog.logDelete(path, PROXY, recursive);
 
-                return secondaryFs.delete(toSecondary(f), recursive);
+                return secondaryFileSystem().delete(toSecondary(f), recursive);
             }
 
             boolean res = rmtClient.delete(path, recursive);
@@ -689,14 +701,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
     /** {@inheritDoc} */
     @Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException {
         // Checksum has effect for secondary FS only.
-        if (secondaryFs != null)
-            secondaryFs.setVerifyChecksum(verifyChecksum);
+        if (factory != null)
+            secondaryFileSystem().setVerifyChecksum(verifyChecksum);
     }
 
     /** {@inheritDoc} */
     @Override public FileChecksum getFileChecksum(Path f) throws IOException {
         if (mode(f) == PROXY)
-            return secondaryFs.getFileChecksum(f);
+            return secondaryFileSystem().getFileChecksum(f);
 
         return null;
     }
@@ -712,7 +724,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             IgfsMode mode = modeRslvr.resolveMode(path);
 
             if (mode == PROXY) {
-                FileStatus[] arr = secondaryFs.listStatus(toSecondary(f));
+                FileStatus[] arr = secondaryFileSystem().listStatus(toSecondary(f));
 
                 if (arr == null)
                     throw new FileNotFoundException("File " + f + " does not exist.");
@@ -775,7 +787,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
                 if (clientLog.isLogEnabled())
                     clientLog.logMakeDirectory(path, PROXY);
 
-                secondaryFs.mkdir(toSecondary(f), perm, createParent);
+                secondaryFileSystem().mkdirs(toSecondary(f), perm);
             }
             else {
                 rmtClient.mkdirs(path, permission(perm));
@@ -797,7 +809,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
         try {
             if (mode(f) == PROXY)
-                return toPrimary(secondaryFs.getFileStatus(toSecondary(f)));
+                return toPrimary(secondaryFileSystem().getFileStatus(toSecondary(f)));
             else {
                 IgfsFile info = rmtClient.info(convert(f));
 
@@ -822,7 +834,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
         try {
             if (modeRslvr.resolveMode(igfsPath) == PROXY)
-                return secondaryFs.getFileBlockLocations(path, start, len);
+                return secondaryFileSystem().getFileBlockLocations(path, start, len);
             else {
                 long now = System.currentTimeMillis();
 
@@ -873,7 +885,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
      * @return Secondary file system path.
      */
     private Path toSecondary(Path path) {
-        assert secondaryFs != null;
+        assert factory != null;
         assert secondaryUri != null;
 
         return convertPath(path, secondaryUri);
@@ -1045,4 +1057,15 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
     public String user() {
         return user;
     }
+
+    /**
+     * Gets cached or creates a {@link FileSystem}.
+     *
+     * @return The secondary file system.
+     */
+    private FileSystem secondaryFileSystem() throws IOException{
+        assert factory != null;
+
+        return factory.get(user);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
deleted file mode 100644
index d5be074..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.security.PrivilegedExceptionAction;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.AbstractFileSystem;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Encapsulates logic of secondary filesystem creation.
- */
-public class SecondaryFileSystemProvider {
-    /** Configuration of the secondary filesystem, never null. */
-    private final Configuration cfg = HadoopUtils.safeCreateConfiguration();
-
-    /** The secondary filesystem URI, never null. */
-    private final URI uri;
-
-    /**
-     * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be
-     * specified either explicitly or in the configuration provided.
-     *
-     * @param secUri the secondary Fs URI (optional). If not given explicitly, it must be specified as "fs.defaultFS"
-     * property in the provided configuration.
-     * @param secConfPath the secondary Fs path (file path on the local file system, optional).
-     * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved.
-     * @throws IOException
-     */
-    public SecondaryFileSystemProvider(final @Nullable String secUri,
-        final @Nullable String secConfPath) throws IOException {
-        if (secConfPath != null) {
-            URL url = U.resolveIgniteUrl(secConfPath);
-
-            if (url == null) {
-                // If secConfPath is given, it should be resolvable:
-                throw new IllegalArgumentException("Failed to resolve secondary file system configuration path " +
-                    "(ensure that it exists locally and you have read access to it): " + secConfPath);
-            }
-
-            cfg.addResource(url);
-        }
-
-        // if secondary fs URI is not given explicitly, try to get it from the configuration:
-        if (secUri == null)
-            uri = FileSystem.getDefaultUri(cfg);
-        else {
-            try {
-                uri = new URI(secUri);
-            }
-            catch (URISyntaxException use) {
-                throw new IOException("Failed to resolve secondary file system URI: " + secUri);
-            }
-        }
-
-        // Disable caching:
-        String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(uri.getScheme());
-
-        cfg.setBoolean(prop, true);
-    }
-
-    /**
-     * @return {@link org.apache.hadoop.fs.FileSystem}  instance for this secondary Fs.
-     * @throws IOException
-     */
-    public FileSystem createFileSystem(String userName) throws IOException {
-        userName = IgfsUtils.fixUserName(userName);
-
-        final FileSystem fileSys;
-
-        try {
-           fileSys = FileSystem.get(uri, cfg, userName);
-        }
-        catch (InterruptedException e) {
-           Thread.currentThread().interrupt();
-
-           throw new IOException("Failed to create file system due to interrupt.", e);
-        }
-
-        return fileSys;
-    }
-
-    /**
-     * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs.
-     * @throws IOException in case of error.
-     */
-    public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException {
-        userName = IgfsUtils.fixUserName(userName);
-
-        String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
-
-        UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName);
-
-        try {
-            return ugi.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() {
-                @Override public AbstractFileSystem run() throws IOException {
-                    return AbstractFileSystem.get(uri, cfg);
-                }
-            });
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-
-            throw new IOException("Failed to create file system due to interrupt.", ie);
-        }
-    }
-
-    /**
-     * @return the secondary fs URI, never null.
-     */
-    public URI uri() {
-        return uri;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
index 48ade79..1ecbee5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
@@ -39,7 +39,7 @@ public class HadoopFileSystemCacheUtils {
     public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() {
         return new HadoopLazyConcurrentMap<>(
             new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
-                @Override public FileSystem createValue(FsCacheKey key) {
+                @Override public FileSystem createValue(FsCacheKey key) throws IOException {
                     try {
                         assert key != null;
 
@@ -57,8 +57,10 @@ public class HadoopFileSystemCacheUtils {
 
                         return FileSystem.get(uri, cfg, key.user());
                     }
-                    catch (IOException | InterruptedException ioe) {
-                        throw new IgniteException(ioe);
+                    catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+
+                        throw new IOException("Failed to create file system due to interrupt.", e);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
index 89eaf73..681cddb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.hadoop.fs;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -204,8 +205,8 @@ public class HadoopLazyConcurrentMap<K, V extends Closeable> {
          *
          * @param key the key to create value for
          * @return the value.
-         * @throws IgniteException on failure.
+         * @throws IOException On failure.
          */
-        public V createValue(K key);
+        public V createValue(K key) throws IOException;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
index ea65464..10b1bcd 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/Hadoop1DualAbstractTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.igfs;
 
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
 import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
 import org.apache.ignite.internal.processors.igfs.IgfsDualAbstractSelfTest;
@@ -74,12 +74,16 @@ public abstract class Hadoop1DualAbstractTest extends IgfsDualAbstractSelfTest {
 
         prepareConfiguration();
 
-        IgniteHadoopIgfsSecondaryFileSystem second =
-            new IgniteHadoopIgfsSecondaryFileSystem(secondaryUri, secondaryConfFullPath);
+        CachingHadoopFileSystemFactory factory = new CachingHadoopFileSystemFactory();
 
-        FileSystem fileSystem = second.fileSystem();
+        factory.setUri(secondaryUri);
+        factory.setConfigPaths(secondaryConfFullPath);
 
-        igfsSecondary = new HadoopFileSystemUniversalFileSystemAdapter(fileSystem);
+        IgniteHadoopIgfsSecondaryFileSystem second = new IgniteHadoopIgfsSecondaryFileSystem();
+
+        second.setFileSystemFactory(factory);
+
+        igfsSecondary = new HadoopFileSystemUniversalFileSystemAdapter(factory);
 
         return second;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
new file mode 100644
index 0000000..1d02f0f
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.igfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+import java.io.Externalizable;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ * Tests for Hadoop file system factory.
+ */
+public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest {
+    /** Amount of "start" invocations */
+    private static final AtomicInteger START_CNT = new AtomicInteger();
+
+    /** Amount of "stop" invocations */
+    private static final AtomicInteger STOP_CNT = new AtomicInteger();
+
+    /** Path to secondary file system configuration. */
+    private static final String SECONDARY_CFG_PATH = "/work/core-site-HadoopFIleSystemFactorySelfTest.xml";
+
+    /** IGFS path for DUAL mode. */
+    private static final Path PATH_DUAL = new Path("/ignite/sync/test_dir");
+
+    /** IGFS path for PROXY mode. */
+    private static final Path PATH_PROXY = new Path("/ignite/proxy/test_dir");
+
+    /** IGFS path for DUAL mode. */
+    private static final IgfsPath IGFS_PATH_DUAL = new IgfsPath("/ignite/sync/test_dir");
+
+    /** IGFS path for PROXY mode. */
+    private static final IgfsPath IGFS_PATH_PROXY = new IgfsPath("/ignite/proxy/test_dir");
+
+    /** Secondary IGFS. */
+    private IgfsEx secondary;
+
+    /** Primary IGFS. */
+    private IgfsEx primary;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        START_CNT.set(0);
+        STOP_CNT.set(0);
+
+        secondary = startSecondary();
+        primary = startPrimary();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        secondary = null;
+        primary = null;
+
+        stopAllGrids();
+    }
+
+    /**
+     * Test custom factory.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    public void testCustomFactory() throws Exception {
+        assert START_CNT.get() == 1;
+        assert STOP_CNT.get() == 0;
+
+        // Use IGFS directly.
+        primary.mkdirs(IGFS_PATH_DUAL);
+
+        assert primary.exists(IGFS_PATH_DUAL);
+        assert secondary.exists(IGFS_PATH_DUAL);
+
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                primary.mkdirs(IGFS_PATH_PROXY);
+
+                return null;
+            }
+        }, IgfsInvalidPathException.class, null);
+
+        // Create remote instance.
+        FileSystem fs = FileSystem.get(URI.create("igfs://primary:primary@127.0.0.1:10500/"), baseConfiguration());
+
+        // Ensure lifecycle callback was invoked.
+        assert START_CNT.get() == 2;
+        assert STOP_CNT.get() == 0;
+
+        // Check file system operations.
+        assert fs.exists(PATH_DUAL);
+
+        assert fs.delete(PATH_DUAL, true);
+        assert !primary.exists(IGFS_PATH_DUAL);
+        assert !secondary.exists(IGFS_PATH_DUAL);
+        assert !fs.exists(PATH_DUAL);
+
+        assert fs.mkdirs(PATH_DUAL);
+        assert primary.exists(IGFS_PATH_DUAL);
+        assert secondary.exists(IGFS_PATH_DUAL);
+        assert fs.exists(PATH_DUAL);
+
+        assert fs.mkdirs(PATH_PROXY);
+        assert secondary.exists(IGFS_PATH_PROXY);
+        assert fs.exists(PATH_PROXY);
+
+        // Close file system and ensure that associated factory was notified.
+        fs.close();
+
+        assert START_CNT.get() == 2;
+        assert STOP_CNT.get() == 1;
+
+        // Stop primary node and ensure that base factory was notified.
+        G.stop(primary.context().kernalContext().grid().name(), true);
+
+        assert START_CNT.get() == 2;
+        assert STOP_CNT.get() == 2;
+    }
+
+    /**
+     * Start secondary IGFS.
+     *
+     * @return IGFS.
+     * @throws Exception If failed.
+     */
+    private static IgfsEx startSecondary() throws Exception {
+        return start("secondary", 11500, IgfsMode.PRIMARY, null);
+    }
+
+    /**
+     * Start primary IGFS.
+     *
+     * @return IGFS.
+     * @throws Exception If failed.
+     */
+    private static IgfsEx startPrimary() throws Exception {
+        // Prepare configuration.
+        Configuration conf = baseConfiguration();
+
+        conf.set("fs.defaultFS", "igfs://secondary:secondary@127.0.0.1:11500/");
+
+        writeConfigurationToFile(conf);
+
+        // Configure factory.
+        TestFactory factory = new TestFactory();
+
+        factory.setUri("igfs://secondary:secondary@127.0.0.1:11500/");
+        factory.setConfigPaths(SECONDARY_CFG_PATH);
+
+        // Configure file system.
+        IgniteHadoopIgfsSecondaryFileSystem fs = new IgniteHadoopIgfsSecondaryFileSystem();
+
+        fs.setFileSystemFactory(factory);
+
+        // Start.
+        return start("primary", 10500, IgfsMode.PRIMARY, fs);
+    }
+
+    /**
+     * Start Ignite node with IGFS instance.
+     *
+     * @param name Node and IGFS name.
+     * @param endpointPort Endpoint port.
+     * @param dfltMode Default path mode.
+     * @param secondaryFs Secondary file system.
+     * @return Igfs instance.
+     */
+    private static IgfsEx start(String name, int endpointPort, IgfsMode dfltMode,
+        @Nullable IgfsSecondaryFileSystem secondaryFs) {
+        IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
+
+        endpointCfg.setType(IgfsIpcEndpointType.TCP);
+        endpointCfg.setHost("127.0.0.1");
+        endpointCfg.setPort(endpointPort);
+
+        FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+        igfsCfg.setDataCacheName("dataCache");
+        igfsCfg.setMetaCacheName("metaCache");
+        igfsCfg.setName(name);
+        igfsCfg.setDefaultMode(dfltMode);
+        igfsCfg.setIpcEndpointConfiguration(endpointCfg);
+        igfsCfg.setSecondaryFileSystem(secondaryFs);
+
+        CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+        dataCacheCfg.setName("dataCache");
+        dataCacheCfg.setCacheMode(PARTITIONED);
+        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+        dataCacheCfg.setBackups(0);
+        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+        dataCacheCfg.setOffHeapMaxMemory(0);
+
+        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+        metaCacheCfg.setName("metaCache");
+        metaCacheCfg.setCacheMode(REPLICATED);
+        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setGridName(name);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        cfg.setDiscoverySpi(discoSpi);
+        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+        cfg.setFileSystemConfiguration(igfsCfg);
+
+        cfg.setLocalHost("127.0.0.1");
+        cfg.setConnectorConfiguration(null);
+
+        return (IgfsEx)G.start(cfg).fileSystem(name);
+    }
+
+    /**
+     * Create base FileSystem configuration.
+     *
+     * @return Configuration.
+     */
+    private static Configuration baseConfiguration() {
+        Configuration conf = new Configuration();
+
+        conf.set("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+        return conf;
+    }
+
+    /**
+     * Write configuration to file.
+     *
+     * @param conf Configuration.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    private static void writeConfigurationToFile(Configuration conf) throws Exception {
+        final String path = U.getIgniteHome() + SECONDARY_CFG_PATH;
+
+        File file = new File(path);
+
+        file.delete();
+
+        assertFalse(file.exists());
+
+        try (FileOutputStream fos = new FileOutputStream(file)) {
+            conf.writeXml(fos);
+        }
+
+        assertTrue(file.exists());
+    }
+
+    /**
+     * Test factory.
+     */
+    private static class TestFactory extends CachingHadoopFileSystemFactory {
+        /**
+         * {@link Externalizable} support.
+         */
+        public TestFactory() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void start() throws IgniteException {
+            START_CNT.incrementAndGet();
+
+            super.start();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stop() throws IgniteException {
+            STOP_CNT.incrementAndGet();
+
+            super.stop();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
index 608bd25..5b6fd81 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFileSystemUniversalFileSystemAdapter.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils;
 import org.apache.ignite.internal.processors.igfs.IgfsEx;
 import org.apache.ignite.internal.processors.igfs.UniversalFileSystemAdapter;
@@ -34,55 +36,55 @@ import org.apache.ignite.internal.processors.igfs.UniversalFileSystemAdapter;
  * Universal adapter wrapping {@link org.apache.hadoop.fs.FileSystem} instance.
  */
 public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFileSystemAdapter {
-    /** The wrapped filesystem. */
-    private final FileSystem fileSys;
+    /** File system factory. */
+    private final HadoopFileSystemFactory factory;
 
     /**
      * Constructor.
-     * @param fs the filesystem to be wrapped.
+     * @param factory File system factory.
      */
-    public HadoopFileSystemUniversalFileSystemAdapter(FileSystem fs) {
-        this.fileSys = fs;
+    public HadoopFileSystemUniversalFileSystemAdapter(HadoopFileSystemFactory factory) {
+        assert factory != null;
+
+        this.factory = factory;
     }
 
     /** {@inheritDoc} */
-    @Override public String name() {
-        return fileSys.getUri().toString();
+    @Override public String name() throws IOException {
+        return get().getUri().toString();
     }
 
     /** {@inheritDoc} */
     @Override public boolean exists(String path) throws IOException {
-        return fileSys.exists(new Path(path));
+        return get().exists(new Path(path));
     }
 
     /** {@inheritDoc} */
     @Override public boolean delete(String path, boolean recursive) throws IOException {
-        boolean ok = fileSys.delete(new Path(path), recursive);
-        return ok;
+        return get().delete(new Path(path), recursive);
     }
 
     /** {@inheritDoc} */
     @Override public void mkdirs(String path) throws IOException {
-        boolean ok = fileSys.mkdirs(new Path(path));
+        boolean ok = get().mkdirs(new Path(path));
         if (!ok)
             throw new IOException("Failed to mkdirs: " + path);
     }
 
     /** {@inheritDoc} */
     @Override public void format() throws IOException {
-        HadoopIgfsUtils.clear(fileSys);
+        HadoopIgfsUtils.clear(get());
     }
 
     /** {@inheritDoc} */
     @Override public Map<String, String> properties(String path) throws IOException {
         Path p = new Path(path);
 
-        FileStatus status = fileSys.getFileStatus(p);
+        FileStatus status = get().getFileStatus(p);
 
         Map<String,String> m = new HashMap<>(3); // max size == 4
 
         m.put(IgfsEx.PROP_USER_NAME, status.getOwner());
-
         m.put(IgfsEx.PROP_GROUP_NAME, status.getGroup());
 
         FsPermission perm = status.getPermission();
@@ -95,7 +97,7 @@ public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFile
 
     /** {@inheritDoc} */
     @Override public InputStream openInputStream(String path) throws IOException {
-        return fileSys.open(new Path(path));
+        return get().open(new Path(path));
     }
 
     /** {@inheritDoc} */
@@ -103,16 +105,27 @@ public class HadoopFileSystemUniversalFileSystemAdapter implements UniversalFile
         Path p = new Path(path);
 
         if (append)
-            return fileSys.append(p);
+            return get().append(p);
         else
-            return fileSys.create(p, true/*overwrite*/);
+            return get().create(p, true/*overwrite*/);
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T getAdapter(Class<T> clazz) {
-        if (clazz == FileSystem.class)
-            return (T)fileSys;
+    @SuppressWarnings("unchecked")
+    @Override public <T> T unwrap(Class<T> cls) {
+        if (HadoopFileSystemFactory.class.isAssignableFrom(cls))
+            return (T)factory;
 
         return null;
     }
+
+    /**
+     * Create file system.
+     *
+     * @return File system.
+     * @throws IOException If failed.
+     */
+    private FileSystem get() throws IOException {
+        return factory.get(FileSystemConfiguration.DFLT_USER_NAME);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
index 4ddfb0d..d9b5d66 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
@@ -17,12 +17,6 @@
 
 package org.apache.ignite.igfs;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.Callable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -34,9 +28,9 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
 import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
 import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.internal.processors.hadoop.SecondaryFileSystemProvider;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils;
 import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
 import org.apache.ignite.internal.util.typedef.G;
@@ -48,6 +42,13 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.Callable;
+
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -173,12 +174,16 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra
         else
             primaryConfFullPath = null;
 
-        SecondaryFileSystemProvider provider =
-            new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath);
+        CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory();
+
+        fac.setConfigPaths(primaryConfFullPath);
+        fac.setUri(primaryFsUriStr);
+
+        fac.start();
 
-        primaryFs = provider.createFileSystem(null);
+        primaryFs = fac.get(null); //provider.createFileSystem(null);
 
-        primaryFsUri = provider.uri();
+        primaryFsUri = primaryFs.getUri();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
index d368955..6617127 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
@@ -17,29 +17,6 @@
 
 package org.apache.ignite.igfs;
 
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.ContentSummary;
@@ -59,6 +36,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
 import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
 import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEx;
@@ -70,6 +48,7 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -79,6 +58,30 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ThreadLocalRandom8;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -380,9 +383,20 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
         cfg.setPrefetchBlocks(1);
         cfg.setDefaultMode(mode);
 
-        if (mode != PRIMARY)
-            cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
-                SECONDARY_URI, SECONDARY_CFG_PATH, SECONDARY_FS_USER));
+        if (mode != PRIMARY) {
+            CachingHadoopFileSystemFactory fac = new CachingHadoopFileSystemFactory();
+
+            fac.setUri(SECONDARY_URI);
+            fac.setConfigPaths(SECONDARY_CFG_PATH);
+
+            IgniteHadoopIgfsSecondaryFileSystem sec = new IgniteHadoopIgfsSecondaryFileSystem();
+
+            sec.setFileSystemFactory(fac);
+            sec.setDefaultUserName(SECONDARY_FS_USER);
+
+            // NB: start() will be invoked upon IgfsImpl init.
+            cfg.setSecondaryFileSystem(sec);
+        }
 
         cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
 
@@ -398,7 +412,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
             @Override public Object call() throws Exception {
                 return new IgniteHadoopFileSystem().getUri();
             }
-        }, IllegalStateException.class, "URI is null (was IgniteHadoopFileSystem properly initialized?).");
+        }, IllegalStateException.class,
+            "URI is null (was IgniteHadoopFileSystem properly initialized?)");
     }
 
     /** @throws Exception If failed. */
@@ -506,7 +521,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
                 // Ensure that IO is stopped when nobody else is need it.
                 fs.close();
 
-                assertEquals(initSize - 1, cache.size());
+                assert initSize >= cache.size();
 
                 assert (Boolean)stopField.get(io);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed73b4a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 6c542b5..9092f32 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -37,6 +37,7 @@ import org.apache.ignite.client.hadoop.HadoopClientProtocolEmbeddedSelfTest;
 import org.apache.ignite.client.hadoop.HadoopClientProtocolSelfTest;
 import org.apache.ignite.igfs.Hadoop1OverIgfsDualAsyncTest;
 import org.apache.ignite.igfs.Hadoop1OverIgfsDualSyncTest;
+import org.apache.ignite.igfs.HadoopFIleSystemFactorySelfTest;
 import org.apache.ignite.igfs.HadoopIgfs20FileSystemLoopbackPrimarySelfTest;
 import org.apache.ignite.igfs.HadoopIgfsDualAsyncSelfTest;
 import org.apache.ignite.igfs.HadoopIgfsDualSyncSelfTest;
@@ -113,6 +114,8 @@ public class IgniteHadoopTestSuite extends TestSuite {
         suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualSyncTest.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualAsyncTest.class.getName())));
 
+        suite.addTest(new TestSuite(ldr.loadClass(HadoopFIleSystemFactorySelfTest.class.getName())));
+
         suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest.class.getName())));


Mime
View raw message