ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-3961: IGFS: Added IgfsSecondaryFileSystem.affintiy() method. This closes #1114. This closes #1252.
Date Wed, 28 Dec 2016 13:47:37 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2.0 a61b0eaff -> 2df39a80d


IGNITE-3961: IGFS: Added IgfsSecondaryFileSystem.affintiy() method. This closes #1114. This closes #1252.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2df39a80
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2df39a80
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2df39a80

Branch: refs/heads/ignite-2.0
Commit: 2df39a80d80e2575be61a902ccd48615796fcde9
Parents: a61b0ea
Author: tledkov-gridgain <tledkov@gridgain.com>
Authored: Wed Dec 28 16:47:24 2016 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Wed Dec 28 16:47:24 2016 +0300

----------------------------------------------------------------------
 .../igfs/IgfsGroupDataBlocksKeyMapper.java      |  17 +-
 .../igfs/secondary/IgfsSecondaryFileSystem.java |  18 ++
 .../local/LocalIgfsSecondaryFileSystem.java     |  96 ++++++-
 .../processors/igfs/IgfsBaseBlockKey.java       |  42 +++
 .../internal/processors/igfs/IgfsBlockKey.java  |  26 +-
 .../processors/igfs/IgfsBlockLocationImpl.java  |  55 ++++
 .../processors/igfs/IgfsDataManager.java        |  12 +-
 .../internal/processors/igfs/IgfsImpl.java      |  12 +-
 .../processors/igfs/IgfsKernalContextAware.java |  32 ---
 .../igfs/IgfsSecondaryFileSystemImpl.java       |   7 +
 .../local/LocalFileSystemBlockKey.java          | 103 +++++++
 .../LocalFileSystemPositionedReadable.java      |  65 +++++
 ...fsSecondaryFileSystemPositionedReadable.java |  65 -----
 .../processors/resource/GridResourceIoc.java    |   6 +-
 .../resource/GridResourceProcessor.java         |  31 ++-
 .../ignite/resources/FileSystemResource.java    |  62 +++++
 .../processors/igfs/IgfsAbstractSelfTest.java   |   2 +-
 .../igfs/IgfsDualAbstractSelfTest.java          |  14 +-
 ...fsLocalSecondaryFileSystemProxySelfTest.java |  81 ++++++
 ...gfsSecondaryFileSystemInjectionSelfTest.java | 270 +++++++++++++++++++
 .../ignite/testsuites/IgniteIgfsTestSuite.java  |   3 +
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java |  21 +-
 ...doopIgfsSecondaryFileSystemDelegateImpl.java |  61 ++++-
 .../impl/igfs/Hadoop1OverIgfsProxyTest.java     |  67 +++++
 .../testsuites/IgniteHadoopTestSuite.java       |   2 +
 25 files changed, 1031 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java
index b35b692..09143d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java
@@ -18,9 +18,10 @@
 package org.apache.ignite.igfs;
 
 import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
-import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
+import org.apache.ignite.internal.processors.igfs.IgfsBaseBlockKey;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
 
 /**
  * {@code IGFS} class providing ability to group file's data blocks together on one node.
@@ -84,15 +85,17 @@ public class IgfsGroupDataBlocksKeyMapper extends GridCacheDefaultAffinityKeyMap
 
     /** {@inheritDoc} */
     @Override public Object affinityKey(Object key) {
-        if (key != null && IgfsBlockKey.class.equals(key.getClass())) {
-            IgfsBlockKey blockKey = (IgfsBlockKey)key;
+        if (key instanceof IgfsBaseBlockKey) {
+            IgfsBaseBlockKey blockKey = (IgfsBaseBlockKey)key;
 
-            if (blockKey.affinityKey() != null)
-                return blockKey.affinityKey();
+            IgniteUuid affKey = blockKey.affinityKey();
 
-            long grpId = blockKey.getBlockId() / grpSize;
+            if (affKey != null)
+                return affKey;
 
-            return blockKey.getFileId().hashCode() + (int)(grpId ^ (grpId >>> 32));
+            long grpId = blockKey.blockId() / grpSize;
+
+            return blockKey.fileHash() + (int)(grpId ^ (grpId >>> 32));
         }
 
         return super.affinityKey(key);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
index 37c9c7d..76ba454 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
@@ -21,8 +21,10 @@ import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Map;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsFile;
 import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathNotFoundException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -202,4 +204,20 @@ public interface IgfsSecondaryFileSystem {
      * @throws IgniteException If failed.
      */
     public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException;
+
+     /**
+     * Get affinity block locations for data blocks of the file. In case {@code maxLen} parameter is set and
+     * particular block location length is greater than this value, block locations will be split into smaller
+     * chunks.
+     *
+     * @param path File path to get affinity for.
+     * @param start Position in the file to start affinity resolution from.
+     * @param len Size of data in the file to resolve affinity for.
+     * @param maxLen Maximum length of a single returned block location length.
+     * @return Affinity block locations.
+     * @throws IgniteException In case of error.
+     * @throws IgfsPathNotFoundException If path doesn't exist.
+     */
+    public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, long maxLen)
+        throws IgniteException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java
index 18d31de..86f7387 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java
@@ -17,10 +17,14 @@
 
 package org.apache.ignite.igfs.secondary.local;
 
+import java.util.ArrayList;
 import java.nio.file.attribute.BasicFileAttributeView;
 import java.nio.file.attribute.FileTime;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsException;
 import org.apache.ignite.igfs.IgfsFile;
 import org.apache.ignite.igfs.IgfsPath;
@@ -29,14 +33,20 @@ import org.apache.ignite.igfs.IgfsPathIsNotDirectoryException;
 import org.apache.ignite.igfs.IgfsPathNotFoundException;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.processors.igfs.IgfsDataManager;
+import org.apache.ignite.internal.processors.igfs.IgfsImpl;
+import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemBlockKey;
 import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl;
 import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemIgfsFile;
 import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemSizeVisitor;
 import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemUtils;
-import org.apache.ignite.internal.processors.igfs.secondary.local.LocalIgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemPositionedReadable;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.resources.FileSystemResource;
+import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.Nullable;
 
 import java.io.File;
@@ -61,6 +71,16 @@ public class LocalIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, Li
     /** Path that will be added to each passed path. */
     private String workDir;
 
+    /** Logger. */
+    @SuppressWarnings("unused")
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** IGFS instance. */
+    @SuppressWarnings("unused")
+    @FileSystemResource
+    private IgfsImpl igfs;
+
     /**
      * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
      *
@@ -258,7 +278,7 @@ public class LocalIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, Li
         try {
             FileInputStream in = new FileInputStream(fileForPath(path));
 
-            return new LocalIgfsSecondaryFileSystemPositionedReadable(in, bufSize);
+            return new LocalFileSystemPositionedReadable(in, bufSize);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to open file for read: " + path);
@@ -402,6 +422,78 @@ public class LocalIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, Li
         // No-op.
     }
 
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len,
+        long maxLen) throws IgniteException {
+        File f = fileForPath(path);
+
+        if (!f.exists())
+            throw new IgfsPathNotFoundException("File not found: " + path);
+
+        // Create fake block & fake affinity for blocks
+        long blockSize = igfs.configuration().getBlockSize();
+
+        if (maxLen <= 0)
+            maxLen = Long.MAX_VALUE;
+
+        assert maxLen > 0 : "maxLen : " + maxLen;
+
+        long end = start + len;
+
+        Collection<IgfsBlockLocation> blocks = new ArrayList<>((int)(len / maxLen));
+
+        IgfsDataManager data = igfs.context().data();
+
+        Collection<ClusterNode> lastNodes = null;
+
+        long lastBlockIdx = -1;
+
+        IgfsBlockLocationImpl lastBlock = null;
+
+        for (long offset = start; offset < end; ) {
+            long blockIdx = offset / blockSize;
+
+            // Each step is min of maxLen and end of block.
+            long lenStep = Math.min(
+                maxLen - (lastBlock != null ? lastBlock.length() : 0),
+                (blockIdx + 1) * blockSize - offset);
+
+            lenStep = Math.min(lenStep, end - offset);
+
+            // Create fake affinity key to map blocks of secondary filesystem to nodes.
+            LocalFileSystemBlockKey affKey = new LocalFileSystemBlockKey(path, blockIdx);
+
+            if (blockIdx != lastBlockIdx) {
+                Collection<ClusterNode> nodes = data.affinityNodes(affKey);
+
+                if (!nodes.equals(lastNodes) && lastNodes != null && lastBlock != null) {
+                    blocks.add(lastBlock);
+
+                    lastBlock = null;
+                }
+
+                lastNodes = nodes;
+
+                lastBlockIdx = blockIdx;
+            }
+
+            if(lastBlock == null)
+                lastBlock = new IgfsBlockLocationImpl(offset, lenStep, lastNodes);
+            else
+                lastBlock.increaseLength(lenStep);
+
+            if (lastBlock.length() == maxLen || lastBlock.start() + lastBlock.length() == end) {
+                blocks.add(lastBlock);
+
+                lastBlock = null;
+            }
+
+            offset += lenStep;
+       }
+
+        return blocks;
+    }
+
     /**
      * Get work directory.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBaseBlockKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBaseBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBaseBlockKey.java
new file mode 100644
index 0000000..05ef086
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBaseBlockKey.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The base class to block key that is used by the {@link IgfsGroupDataBlocksKeyMapper}
+ */
+public interface IgfsBaseBlockKey {
+    /**
+     * @return Block ID.
+     */
+    public long blockId();
+
+    /**
+     * @return Hash based on a file identifier (path, ID, etc).
+     */
+    public int fileHash();
+
+    /**
+     * @return Block affinity key (if any).
+     */
+    @Nullable public IgniteUuid affinityKey();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
index c366ae3..414f6b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
@@ -44,8 +44,8 @@ import org.jetbrains.annotations.Nullable;
 /**
  * File's binary data block key.
  */
-@GridInternal
-public final class IgfsBlockKey implements Message, Externalizable, Binarylizable, Comparable<IgfsBlockKey> {
+public final class IgfsBlockKey implements IgfsBaseBlockKey, Message, Externalizable, Binarylizable,
+    Comparable<IgfsBlockKey> {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -93,13 +93,21 @@ public final class IgfsBlockKey implements Message, Externalizable, Binarylizabl
         return fileId;
     }
 
-    /**
-     * @return Block affinity key.
-     */
-    public IgniteUuid affinityKey() {
+    /** {@inheritDoc} */
+    @Override public IgniteUuid affinityKey() {
         return affKey;
     }
 
+    /** {@inheritDoc} */
+    @Override public long blockId() {
+        return blockId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int fileHash() {
+        return fileId.hashCode();
+    }
+
     /**
      * @return Evict exclude flag.
      */
@@ -107,12 +115,6 @@ public final class IgfsBlockKey implements Message, Externalizable, Binarylizabl
         return evictExclude;
     }
 
-    /**
-     * @return Block ID.
-     */
-    public long getBlockId() {
-        return blockId;
-    }
 
     /** {@inheritDoc} */
     @Override public void onAckReceived() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
index 2d4a0af..3f5d9fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
@@ -24,6 +24,7 @@ import java.io.ObjectOutput;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * File block location in the grid.
@@ -61,6 +63,7 @@ public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable,
     private Collection<String> names;
 
     /** */
+    @GridToStringInclude
     private Collection<String> hosts;
 
     /**
@@ -102,6 +105,44 @@ public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable,
     }
 
     /**
+     * @param start Start.
+     * @param len Length.
+     * @param block Block.
+     */
+    public IgfsBlockLocationImpl(long start, long len, IgfsBlockLocation block) {
+        assert start >= 0;
+        assert len > 0;
+
+        this.start = start;
+        this.len = len;
+
+        nodeIds = block.nodeIds();
+        names = block.names();
+        hosts = block.hosts();
+    }
+
+    /**
+     * @param start Start.
+     * @param len Length.
+     * @param names Collection of host:port addresses.
+     * @param hosts Collection of host:port addresses.
+     */
+    public IgfsBlockLocationImpl(long start, long len, Collection<String> names, Collection<String> hosts) {
+        assert start >= 0;
+        assert len > 0;
+        assert names != null && !names.isEmpty();
+        assert hosts != null && !hosts.isEmpty();
+
+        this.start = start;
+        this.len = len;
+
+        nodeIds = Collections.emptySet();
+
+        this.names = names;
+        this.hosts = hosts;
+    }
+
+    /**
      * @return Start position.
      */
     @Override public long start() {
@@ -116,6 +157,20 @@ public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable,
     }
 
     /**
+     * @param addLen Length to increase.
+     */
+    public void increaseLength(long addLen) {
+        len += addLen;
+    }
+
+    /**
+     * @param len Block length.
+     */
+    public void length(long len) {
+        this.len = len;
+    }
+
+    /**
      * @return Node IDs.
      */
     @Override public Collection<UUID> nodeIds() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 4490a68..d6297b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -278,6 +278,16 @@ public class IgfsDataManager extends IgfsManager {
     }
 
     /**
+     * Maps affinity key to node.
+     *
+     * @param affinityKey Affinity key to map.
+     * @return Primary node for this key.
+     */
+    public Collection<ClusterNode> affinityNodes(Object affinityKey) {
+        return dataCache.affinity().mapKeyToPrimaryAndBackups(affinityKey);
+    }
+
+    /**
      * Creates new instance of explicit data streamer.
      *
      * @return New instance of data streamer.
@@ -1045,7 +1055,7 @@ public class IgfsDataManager extends IgfsManager {
 
         // Create non-colocated key.
         IgfsBlockKey key = new IgfsBlockKey(colocatedKey.getFileId(), null,
-            colocatedKey.evictExclude(), colocatedKey.getBlockId());
+            colocatedKey.evictExclude(), colocatedKey.blockId());
 
         try (IgniteInternalTx tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
             // Lock keys.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 01e434f..59674f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -179,8 +179,11 @@ public final class IgfsImpl implements IgfsEx {
         data = igfsCtx.data();
         secondaryFs = cfg.getSecondaryFileSystem();
 
-        if (secondaryFs instanceof IgfsKernalContextAware)
-            ((IgfsKernalContextAware)secondaryFs).setKernalContext(igfsCtx.kernalContext());
+        if (secondaryFs != null) {
+            igfsCtx.kernalContext().resource().injectGeneric(secondaryFs);
+
+            igfsCtx.kernalContext().resource().injectFileSystem(secondaryFs, this);
+        }
 
         if (secondaryFs instanceof LifecycleAware)
             ((LifecycleAware)secondaryFs).start();
@@ -635,7 +638,7 @@ public final class IgfsImpl implements IgfsEx {
                         IgfsFile file = secondaryFs.update(path, props);
 
                         if (file != null)
-                            return new IgfsFileImpl(secondaryFs.update(path, props), data.groupBlockSize());
+                            return new IgfsFileImpl(file, data.groupBlockSize());
                 }
 
                 return null;
@@ -1263,6 +1266,9 @@ public final class IgfsImpl implements IgfsEx {
 
                 IgfsMode mode = resolveMode(path);
 
+                if (mode == PROXY)
+                    return secondaryFs.affinity(path, start, len, maxLen);
+
                 // Check memory first.
                 IgfsEntryInfo info = meta.infoForPath(path);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsKernalContextAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsKernalContextAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsKernalContextAware.java
deleted file mode 100644
index 7f59db4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsKernalContextAware.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import org.apache.ignite.internal.GridKernalContext;
-
-/**
- * Indicates whether particular file system accepts kernal context.
- */
-public interface IgfsKernalContextAware {
-    /**
-     * Set kernal context.
-     *
-     * @param ctx Kernal context.
-     */
-    public void setKernalContext(GridKernalContext ctx);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index 4e14b46..1c135fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsFile;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
@@ -121,4 +122,10 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
     @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
         igfs.setTimes(path, accessTime, modificationTime);
     }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len,
+        long maxLen) throws IgniteException {
+        return igfs.affinity(path, start, len, maxLen);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemBlockKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemBlockKey.java
new file mode 100644
index 0000000..e3990df
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemBlockKey.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.secondary.local;
+
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.igfs.IgfsBaseBlockKey;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * File's binary data block key.
+ */
+public final class LocalFileSystemBlockKey implements IgfsBaseBlockKey, Comparable<LocalFileSystemBlockKey> {
+    /** IGFS path. */
+    private IgfsPath path;
+
+    /** Block ID. */
+    private long blockId;
+
+    /**
+     * Constructs file's binary data block key.
+     *
+     * @param path IGFS path.
+     * @param blockId Block ID.
+     */
+    public LocalFileSystemBlockKey(IgfsPath path, long blockId) {
+        assert path != null;
+        assert blockId >= 0;
+
+        this.path = path;
+        this.blockId = blockId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long blockId() {
+        return blockId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int fileHash() {
+        return path.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid affinityKey() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(@NotNull LocalFileSystemBlockKey o) {
+        int res = path.compareTo(o.path);
+
+        if (res != 0)
+            return res;
+
+        long v1 = blockId;
+        long v2 = o.blockId;
+
+        if (v1 != v2)
+            return v1 > v2 ? 1 : -1;
+
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return path.hashCode() + (int)(blockId ^ (blockId >>> 32));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (o == this)
+            return true;
+
+        if (o == null || !(o instanceof LocalFileSystemBlockKey))
+            return false;
+
+        LocalFileSystemBlockKey that = (LocalFileSystemBlockKey)o;
+
+        return blockId == that.blockId && path.equals(that.path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(LocalFileSystemBlockKey.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemPositionedReadable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemPositionedReadable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemPositionedReadable.java
new file mode 100644
index 0000000..6bdba95
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemPositionedReadable.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.secondary.local;
+
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Positioned readable interface for local secondary file system.
+ */
+public class LocalFileSystemPositionedReadable extends BufferedInputStream
+    implements IgfsSecondaryFileSystemPositionedReadable {
+    /** Last read position. */
+    private long lastReadPos;
+
+    /**
+     * Constructor.
+     *
+     * @param in Input stream.
+     * @param bufSize Buffer size.
+     */
+    public LocalFileSystemPositionedReadable(FileInputStream in, int bufSize) {
+        super(in, bufSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(long readPos, byte[] buf, int off, int len) throws IOException {
+        if (in == null)
+            throw new IOException("Stream is closed.");
+
+        if (readPos < lastReadPos || readPos + len > lastReadPos + this.buf.length) {
+            ((FileInputStream)in).getChannel().position(readPos);
+
+            pos = 0;
+            count = 0;
+        }
+
+        int bytesRead = read(buf, off, len);
+
+        if (bytesRead != -1) {
+            // Advance last read position only if we really read some bytes from the stream.
+            lastReadPos = readPos + bytesRead;
+        }
+
+        return bytesRead;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java
deleted file mode 100644
index ebf56ad..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs.secondary.local;
-
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
-
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-
-/**
- * Positioned readable interface for local secondary file system.
- */
-public class LocalIgfsSecondaryFileSystemPositionedReadable extends BufferedInputStream
-    implements IgfsSecondaryFileSystemPositionedReadable {
-    /** Last read position. */
-    private long lastReadPos;
-
-    /**
-     * Constructor.
-     *
-     * @param in Input stream.
-     * @param bufSize Buffer size.
-     */
-    public LocalIgfsSecondaryFileSystemPositionedReadable(FileInputStream in, int bufSize) {
-        super(in, bufSize);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int read(long readPos, byte[] buf, int off, int len) throws IOException {
-        if (in == null)
-            throw new IOException("Stream is closed.");
-
-        if (readPos < lastReadPos || readPos + len > lastReadPos + this.buf.length) {
-            ((FileInputStream)in).getChannel().position(readPos);
-
-            pos = 0;
-            count = 0;
-        }
-
-        int bytesRead = read(buf, off, len);
-
-        if (bytesRead != -1) {
-            // Advance last read position only if we really read some bytes from the stream.
-            lastReadPos = readPos + bytesRead;
-        }
-
-        return bytesRead;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
index 0158973..07a4fff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.resources.CacheNameResource;
 import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.FileSystemResource;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.JobContextResource;
 import org.apache.ignite.resources.LoadBalancerResource;
@@ -511,7 +512,10 @@ public class GridResourceIoc {
         JOB_CONTEXT(JobContextResource.class),
 
         /** */
-        CACHE_STORE_SESSION(CacheStoreSessionResource.class);
+        CACHE_STORE_SESSION(CacheStoreSessionResource.class),
+
+        /** */
+        FILESYSTEM_RESOURCE(FileSystemResource.class);
 
         /** */
         public final Class<? extends Annotation> clazz;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
index 84d07b6..bdfbe50 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
@@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.resource;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.Collection;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteFileSystem;
 import org.apache.ignite.cache.store.CacheStoreSession;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobContext;
@@ -201,6 +201,26 @@ public class GridResourceProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Injects filesystem instance into given object.
+     *
+     * @param obj Object.
+     * @param igfs Ignite filesystem to inject.
+     * @return {@code True} if filesystem was injected.
+     * @throws IgniteCheckedException If failed to inject.
+     */
+    public boolean injectFileSystem(Object obj, IgniteFileSystem igfs) throws IgniteCheckedException {
+        assert obj != null;
+
+        if (log.isDebugEnabled())
+            log.debug("Injecting cache store session: " + obj);
+
+        // Unwrap Proxy object.
+        obj = unwrapTarget(obj);
+
+        return inject(obj, GridResourceIoc.ResourceAnnotation.FILESYSTEM_RESOURCE, null, null, igfs);
+    }
+
+    /**
      * @param obj Object to inject.
      * @throws IgniteCheckedException If failed to inject.
      */
@@ -308,6 +328,10 @@ public class GridResourceProcessor extends GridProcessorAdapter {
                 res = new GridResourceJobContextInjector((ComputeJobContext)param);
                 break;
 
+            case FILESYSTEM_RESOURCE:
+                res = new GridResourceBasicInjector<>(param);
+                break;
+
             default:
                 res = injectorByAnnotation[ann.ordinal()];
                 break;
@@ -318,6 +342,11 @@ public class GridResourceProcessor extends GridProcessorAdapter {
 
     /**
      * @param obj Object to inject.
+     * @param ann Annotation enum.
+     * @param dep Grid deployment object.
+     * @param depCls Grid deployment class.
+     * @param param Resource to inject.
+     * @return {@code True} if resource was injected.
      * @throws IgniteCheckedException If failed to inject.
      */
     private boolean inject(Object obj, GridResourceIoc.ResourceAnnotation ann, @Nullable GridDeployment dep,

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/resources/FileSystemResource.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/resources/FileSystemResource.java b/modules/core/src/main/java/org/apache/ignite/resources/FileSystemResource.java
new file mode 100644
index 0000000..e2aa06d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/resources/FileSystemResource.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.resources;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotates a field or a setter method for injection of primary Ignite filesystem to a secondary
+ * filesystem implementation.
+ *
+ * <p>
+ * Here is how injection would typically happen:
+ * <pre name="code" class="java">
+ * public class MySecondaryFS implements IgfsSecondaryFileSystem {
+ *      ...
+ *      // Inject instance of primary filesystem.
+ *      &#64;FileSystemResource
+ *      private IgniteFileSystem igfs;
+ *      ...
+ *  }
+ * </pre>
+ * or attach the same annotations to methods:
+ * <pre name="code" class="java">
+ * public class MySecondaryFS implements IgfsSecondaryFileSystem {
+ *     ...
+ *     private IgniteFileSystem igfs;
+ *     ...
+ *      // Inject instance of primary filesystem.
+ *      &#64;FileSystemResource
+ *     public void setIgfs(IgniteFileSystem igfs) {
+ *          this.igfs = igfs;
+ *     }
+ *     ...
+ * }
+ * </pre>
+ * <p>
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD, ElementType.FIELD})
+public @interface FileSystemResource {
+    // No-op.
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 04f3c8e..d0b700e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -736,7 +736,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsAbstractBaseSelfTest {
         if(!propertiesSupported())
             return;
 
-        if (dual && !(igfsSecondaryFileSystem instanceof IgfsSecondaryFileSystemImpl)) {
+        if (mode != PRIMARY && !(igfsSecondaryFileSystem instanceof IgfsSecondaryFileSystemImpl)) {
             // In case of Hadoop dual mode only user name, group name, and permission properties are updated,
             // an arbitrary named property is just ignored:
             checkRootPropertyUpdate("foo", "moo", null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
index bea318d..7b83cfc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CyclicBarrier;
 
 import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
+import static org.apache.ignite.igfs.IgfsMode.PROXY;
 
 /**
  * Tests for IGFS working in mode when remote file system exists: DUAL_SYNC, DUAL_ASYNC.
@@ -48,8 +49,6 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest {
      */
     protected IgfsDualAbstractSelfTest(IgfsMode mode) {
         super(mode);
-
-        assert mode == DUAL_SYNC || mode == DUAL_ASYNC;
     }
 
     /** {@inheritDoc} */
@@ -72,10 +71,13 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest {
             assert igfs.exists(p);
 
         assert igfs.modeResolver().resolveMode(gg) == mode;
-        assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "sync")) == IgfsMode.DUAL_SYNC;
-        assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "async")) == IgfsMode.DUAL_ASYNC;
-        assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "primary")) == IgfsMode.PRIMARY;
-        assert !igfsSecondary.exists("/ignite/primary"); // PRIMARY mode path must exist in upper level fs only.
+
+        if (mode != PROXY) {
+            assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "sync")) == IgfsMode.DUAL_SYNC;
+            assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "async")) == IgfsMode.DUAL_ASYNC;
+            assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "primary")) == IgfsMode.PRIMARY;
+            assert !igfsSecondary.exists("/ignite/primary"); // PRIMARY mode path must exist in upper level fs only.
+        }
 
         // All the child paths of "/ignite/" must be visible in listings:
         assert igfs.listFiles(gg).size() == 3;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java
index e7f9bbb..e7e3ac8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
 import java.nio.file.Files;
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsFile;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
@@ -58,6 +59,11 @@ public class IgfsLocalSecondaryFileSystemProxySelfTest extends IgfsProxySelfTest
     /** */
     private final File fileLinkSrc = new File(FS_WORK_DIR + File.separatorChar + "file");
 
+    /** {@inheritDoc} */
+    @Override protected int nodeCount() {
+        return 3;
+    }
+
     /**
      * Creates secondary filesystems.
      * @return IgfsSecondaryFileSystem
@@ -215,6 +221,81 @@ public class IgfsLocalSecondaryFileSystemProxySelfTest extends IgfsProxySelfTest
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testAffinityMaxLen() throws Exception {
+        awaitPartitionMapExchange();
+
+        long fileSize = 32 * 1024 * 1024;
+
+        IgfsPath filePath = new IgfsPath("/file");
+
+        try (OutputStream os = igfs.create(filePath, true)) {
+            for(int i = 0; i < fileSize / chunk.length; ++i)
+                os.write(chunk);
+        }
+
+        Collection<IgfsBlockLocation> blocks;
+
+        long len = igfs.info(filePath).length();
+        int start = 0;
+
+        // Check default maxLen (maxLen = 0)
+        for (int i = 0; i < igfs.context().data().groupBlockSize() / 1024; i++) {
+            Collection<IgfsBlockLocation> blocks0 =
+                igfs.affinity(filePath, start, len, 0);
+
+            blocks = igfs.affinity(filePath, start, len, Long.MAX_VALUE);
+
+            assertTrue(blocks0.size() > 1);
+            assertEquals(blocks0.size(), blocks.size());
+            assertEquals(F.first(blocks).start(), start);
+            assertEquals(start + len, F.last(blocks).start() + F.last(blocks).length());
+            assertEquals(blocks0, blocks);
+
+            len -= 1024 * 2;
+            start += 1024;
+            System.out.println("+++ ");
+        }
+
+        len = igfs.info(filePath).length();
+        start = 0;
+        long maxLen = igfs.context().data().groupBlockSize() * 2;
+
+        // Different cases of start, len and maxLen
+        for (int i = 0; i < igfs.context().data().groupBlockSize() / 1024; i++) {
+            blocks = igfs.affinity(filePath, start, len, maxLen);
+
+            assertEquals(F.first(blocks).start(), start);
+            assertEquals(start + len, F.last(blocks).start() + F.last(blocks).length());
+
+            long totalLen = 0;
+
+            for (IgfsBlockLocation block : blocks) {
+                totalLen += block.length();
+
+                assert block.length() <= maxLen : "block.length() <= maxLen. [block.length=" + block.length()
+                    + ", maxLen=" + maxLen + ']';
+
+                assert block.length() + block.start() <= start + len : "block.length() + block.start() < start + len. [block.length=" + block.length()
+                    + ", block.start()=" + block.start() + ", start=" + start +", len=" + len + ']';
+
+                for (IgfsBlockLocation block0 : blocks)
+                    if (!block0.equals(block))
+                        assert block.start() < block0.start() && block.start() + block.length() <= block0.start() ||
+                            block.start() > block0.start() && block0.start() + block0.length() <= block.start()
+                            : "Blocks cross each other: block0=" +  block + ", block1= " + block0;
+            }
+
+            assert totalLen == len : "Summary length of blocks must be: " + len + " actual: " + totalLen;
+
+            len -= 1024 * 2;
+            start += 1024;
+            maxLen -= igfs.context().data().groupBlockSize() * 2 / 1024;
+        }
+    }
+
+    /**
      *
      * @throws Exception If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java
new file mode 100644
index 0000000..d4187cb
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.resources.FileSystemResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ * Tests for resource injection to secondary file system.
+ */
+public class IgfsSecondaryFileSystemInjectionSelfTest extends GridCommonAbstractTest {
+    /** IGFS name. */
+    protected static final String IGFS_NAME = "igfs-test";
+
+    /** Test implementation of secondary filesystem. */
+    private TestBaseSecondaryFsMock secondary;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        G.stopAll(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+        igfsCfg.setDataCacheName("dataCache");
+        igfsCfg.setMetaCacheName("metaCache");
+        igfsCfg.setName(IGFS_NAME);
+        igfsCfg.setDefaultMode(IgfsMode.DUAL_SYNC);
+        igfsCfg.setSecondaryFileSystem(secondary);
+
+        CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+        dataCacheCfg.setName("dataCache");
+        dataCacheCfg.setCacheMode(PARTITIONED);
+        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+        metaCacheCfg.setName("metaCache");
+        metaCacheCfg.setCacheMode(REPLICATED);
+        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        cfg.setFileSystemConfiguration(igfsCfg);
+        cfg.setCacheConfiguration(metaCacheCfg, dataCacheCfg);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"UnusedDeclaration"})
+    public void testInjectPrimaryByField() throws Exception {
+        secondary = new TestBaseSecondaryFsMock() {
+            @FileSystemResource
+            private IgfsImpl igfs;
+
+            @LoggerResource
+            private IgniteLogger log;
+
+            @IgniteInstanceResource
+            private Ignite ig;
+
+            @Override boolean checkInjection(Ignite ignite, IgniteFileSystem primary) {
+                return igfs == primary && log instanceof IgniteLogger && ig == ignite;
+            }
+        };
+
+        Ignite ig = startGrid(0);
+
+        IgniteFileSystem igfs = ig.fileSystem(IGFS_NAME);
+
+        assert secondary.checkInjection(ig, igfs);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"UnusedDeclaration"})
+    public void testInjectPrimaryByMethods() throws Exception {
+        secondary = new TestBaseSecondaryFsMock() {
+            /** Ignite instance. */
+            private Ignite ig;
+
+            /** IGFS instance. */
+            private IgniteFileSystem igfs;
+
+            /** Logger injected flag */
+            private boolean logSet;
+
+            /**
+             * @param igfs Primary IGFS.
+             */
+            @FileSystemResource
+            void setPrimaryIgfs(IgfsImpl igfs) {
+                this.igfs = igfs;
+            }
+
+            /**
+             * @param log Ignite logger.
+             */
+            @LoggerResource
+            void setIgLogger(IgniteLogger log) {
+                logSet = log instanceof IgniteLogger;
+            }
+
+            /**
+             * @param ig Ignite instance.
+             */
+            @IgniteInstanceResource
+            void setIgniteInst(Ignite ig) {
+                this.ig = ig;
+            }
+
+            @Override boolean checkInjection(Ignite ignite, IgniteFileSystem primary) {
+                return ignite == ig && primary == igfs && logSet;
+            }
+        };
+
+        Ignite ig = startGrid(0);
+
+        IgniteFileSystem igfs = ig.fileSystem(IGFS_NAME);
+
+        assert secondary.checkInjection(ig, igfs);
+    }
+
+    /**
+     *
+     */
+    private static abstract class TestBaseSecondaryFsMock implements IgfsSecondaryFileSystem {
+
+        /** {@inheritDoc} */
+        @Override public boolean exists(IgfsPath path) {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void rename(IgfsPath src, IgfsPath dest) throws IgniteException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean delete(IgfsPath path, boolean recursive) throws IgniteException {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void mkdirs(IgfsPath path) throws IgniteException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) throws IgniteException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) throws IgniteException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public OutputStream create(IgfsPath path, boolean overwrite) throws IgniteException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, long blockSize,
+            @Nullable Map<String, String> props) throws IgniteException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
+            @Nullable Map<String, String> props) throws IgniteException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgfsFile info(IgfsPath path) throws IgniteException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long usedSpaceSize() throws IgniteException {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len,
+            long maxLen) throws IgniteException {
+            return null;
+        }
+
+        /**
+         * @param ignite Ignite instance.
+         * @param primary Primary IGFS.
+         * @return {@code True} if injection is correct.
+         */
+        abstract boolean checkInjection(Ignite ignite, IgniteFileSystem primary);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
index 775c2ce..76ed440 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorSelfTest;
 import org.apache.ignite.internal.processors.igfs.IgfsProcessorValidationSelfTest;
 import org.apache.ignite.internal.processors.igfs.IgfsProxySelfTest;
 import org.apache.ignite.internal.processors.igfs.IgfsLocalSecondaryFileSystemProxySelfTest;
+import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemInjectionSelfTest;
 import org.apache.ignite.internal.processors.igfs.IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest;
 import org.apache.ignite.internal.processors.igfs.IgfsSizeSelfTest;
 import org.apache.ignite.internal.processors.igfs.IgfsStartCacheTest;
@@ -167,6 +168,8 @@ public class IgniteIgfsTestSuite extends TestSuite {
         suite.addTestSuite(IgfsAtomicPrimaryOffheapTieredSelfTest.class);
         suite.addTestSuite(IgfsAtomicPrimaryOffheapValuesSelfTest.class);
 
+        suite.addTestSuite(IgfsSecondaryFileSystemInjectionSelfTest.class);
+
         return suite;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index d9215db..674cca7 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -20,20 +20,22 @@ package org.apache.ignite.hadoop.fs;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsFile;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.IgfsUserContext;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
 import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
 import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
 import org.apache.ignite.internal.processors.hadoop.delegate.HadoopIgfsSecondaryFileSystemDelegate;
-import org.apache.ignite.internal.processors.igfs.IgfsKernalContextAware;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.jetbrains.annotations.Nullable;
 
 import java.io.OutputStream;
@@ -46,8 +48,8 @@ import java.util.concurrent.Callable;
  * <p>
  * Target {@code FileSystem}'s are created on per-user basis using passed {@link HadoopFileSystemFactory}.
  */
-public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, IgfsKernalContextAware,
-    LifecycleAware, HadoopPayloadAware {
+public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, LifecycleAware,
+    HadoopPayloadAware {
     /** The default user name. It is used if no user context is set. */
     private String dfltUsrName;
 
@@ -245,8 +247,17 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     }
 
     /** {@inheritDoc} */
-    @Override public void setKernalContext(GridKernalContext ctx) {
-        this.ctx = ctx;
+    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len,
+        long maxLen) throws IgniteException {
+        return target.affinity(path, start, len, maxLen);
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     */
+    @IgniteInstanceResource
+    public void setIgniteInstance(IgniteEx ignite) {
+        ctx = ignite.context();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
index e336fad..fe6492e 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.processors.hadoop.impl.delegate;
 
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -28,6 +31,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
 import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
 import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
 import org.apache.ignite.igfs.IgfsException;
 import org.apache.ignite.igfs.IgfsFile;
@@ -42,6 +46,7 @@ import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFac
 import org.apache.ignite.internal.processors.hadoop.delegate.HadoopIgfsSecondaryFileSystemDelegate;
 import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProperties;
 import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl;
 import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
 import org.apache.ignite.internal.processors.igfs.IgfsFileImpl;
 import org.apache.ignite.internal.processors.igfs.IgfsUtils;
@@ -104,12 +109,17 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
 
         final FileSystem fileSys = fileSystemForUser();
 
+        Path hadoopPath = convert(path);
+
         try {
+            if (!fileSys.exists(hadoopPath))
+                return null;
+
             if (props0.userName() != null || props0.groupName() != null)
-                fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
+                fileSys.setOwner(hadoopPath, props0.userName(), props0.groupName());
 
             if (props0.permission() != null)
-                fileSys.setPermission(convert(path), props0.permission());
+                fileSys.setPermission(hadoopPath, props0.permission());
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]");
@@ -266,7 +276,14 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
     @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
         @Nullable Map<String, String> props) {
         try {
-            return fileSystemForUser().append(convert(path), bufSize);
+            Path hadoopPath = convert(path);
+
+            FileSystem fs = fileSystemForUser();
+
+            if (create && !fs.exists(hadoopPath))
+                return fs.create(hadoopPath, false, bufSize);
+            else
+                return fs.append(convert(path), bufSize);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
@@ -371,6 +388,24 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
     }
 
     /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len,
+        long maxLen) throws IgniteException {
+        try {
+            BlockLocation[] hadoopBlocks = fileSystemForUser().getFileBlockLocations(convert(path), start, len);
+
+            List<IgfsBlockLocation> blks = new ArrayList<>(hadoopBlocks.length);
+
+            for (int i = 0; i < hadoopBlocks.length; ++i)
+                blks.add(convertBlockLocation(hadoopBlocks[i]));
+
+            return blks;
+        }
+        catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed affinity for path: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
     public void start() {
         factory.start();
     }
@@ -393,6 +428,25 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
     }
 
     /**
+     * Convert IGFS affinity block location into Hadoop affinity block location.
+     *
+     * @param block IGFS affinity block location.
+     * @return Hadoop affinity block location.
+     */
+    private IgfsBlockLocation convertBlockLocation(BlockLocation block) {
+        try {
+            String[] names = block.getNames();
+            String[] hosts = block.getHosts();
+
+            return new IgfsBlockLocationImpl(
+                block.getOffset(), block.getLength(),
+                Arrays.asList(names), Arrays.asList(hosts));
+        } catch (IOException e) {
+            throw handleSecondaryFsError(e, "Failed convert block location: " + block);
+        }
+    }
+
+    /**
      * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
      *
      * @param e Exception to check.
@@ -406,6 +460,7 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
     /**
      * Cast IO exception to IGFS exception.
      *
+     * @param msg Error message.
      * @param e IO exception.
      * @return IGFS exception.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsProxyTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsProxyTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsProxyTest.java
new file mode 100644
index 0000000..c7c792d
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsProxyTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import java.io.OutputStream;
+import java.util.Collection;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * DUAL_ASYNC mode test.
+ */
+public class Hadoop1OverIgfsProxyTest extends Hadoop1DualAbstractTest {
+    /**
+     * Constructor.
+     */
+    public Hadoop1OverIgfsProxyTest() {
+        super(IgfsMode.PROXY);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAffinity() throws Exception {
+        long fileSize = 32 * 1024 * 1024;
+
+        IgfsPath filePath = new IgfsPath("/file");
+
+        try (OutputStream os = igfs.create(filePath, true)) {
+            for(int i = 0; i < fileSize / chunk.length; ++i)
+                os.write(chunk);
+        }
+
+        long len = igfs.info(filePath).length();
+        int start = 0;
+
+        // Check default maxLen (maxLen = 0)
+        for (int i = 0; i < igfs.context().data().groupBlockSize() / 1024; i++) {
+            Collection<IgfsBlockLocation> blocks = igfs.affinity(filePath, start, len);
+
+            assertEquals(F.first(blocks).start(), start);
+            assertEquals(start + len, F.last(blocks).start() + F.last(blocks).length());
+
+            len -= 1024 * 2;
+            start += 1024;
+        }
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 6046cc1..01893fb 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProt
 import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolSelfTest;
 import org.apache.ignite.internal.processors.hadoop.impl.HadoopTxConfigCacheTest;
 import org.apache.ignite.internal.processors.hadoop.impl.fs.KerberosHadoopFileSystemFactorySelfTest;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.Hadoop1OverIgfsProxyTest;
 import org.apache.ignite.internal.processors.hadoop.impl.util.BasicUserNameMapperSelfTest;
 import org.apache.ignite.internal.processors.hadoop.impl.util.ChainedUserNameMapperSelfTest;
 import org.apache.ignite.internal.processors.hadoop.impl.util.KerberosUserNameMapperSelfTest;
@@ -136,6 +137,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualSyncTest.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualAsyncTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsProxyTest.class.getName())));
 
         suite.addTest(new TestSuite(ldr.loadClass(HadoopFIleSystemFactorySelfTest.class.getName())));
 


Mime
View raw message