ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [3/8] ignite git commit: IGNITE-3859: IGFS: Support direct PROXY mode invocation in the open method, add proxy mode to IgfsInputStreamImpl This closes #1065. This closes #1083.
Date Wed, 21 Sep 2016 08:58:26 GMT
IGNITE-3859:  IGFS:  Support direct PROXY mode invocation in the open method, add proxy mode
to IgfsInputStreamImpl
This closes #1065. This closes #1083.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a35ee9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a35ee9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a35ee9d

Branch: refs/heads/master
Commit: 5a35ee9dad194b3009151b79f0ebd3976bb8fd22
Parents: 2474e2b
Author: tledkov-gridgain <tledkov@gridgain.com>
Authored: Tue Sep 20 14:10:55 2016 +0500
Committer: tledkov-gridgain <tledkov@gridgain.com>
Committed: Tue Sep 20 14:10:55 2016 +0500

----------------------------------------------------------------------
 .../internal/processors/igfs/IgfsContext.java   |  35 ++++++
 .../processors/igfs/IgfsDataManager.java        | 121 ++++++++-----------
 .../internal/processors/igfs/IgfsImpl.java      |  82 ++++++++++---
 .../processors/igfs/IgfsInputStreamImpl.java    | 103 +++++++++++-----
 4 files changed, 226 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index 3e01246..3405b53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.processors.igfs;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
@@ -60,6 +63,12 @@ public class IgfsContext {
     /** Local cluster node. */
     private volatile ClusterNode locNode;
 
+    /** IGFS executor service. */
+    private ExecutorService igfsSvc;
+
+    /** Logger. */
+    protected IgniteLogger log;
+
     /**
      * @param ctx Kernal context.
      * @param cfg IGFS configuration.
@@ -85,6 +94,10 @@ public class IgfsContext {
         this.srvMgr = add(srvMgr);
         this.fragmentizerMgr = add(fragmentizerMgr);
 
+        log = ctx.log(IgfsContext.class);
+
+        igfsSvc = ctx.getIgfsExecutorService();
+
         igfs = new IgfsImpl(this);
     }
 
@@ -206,6 +219,28 @@ public class IgfsContext {
     }
 
     /**
+     * Executes runnable in IGFS executor service. If execution rejected, runnable will be
executed
+     * in caller thread.
+     *
+     * @param r Runnable to execute.
+     */
+    public void runInIgfsThreadPool(Runnable r) {
+        try {
+            igfsSvc.submit(r);
+        }
+        catch (RejectedExecutionException ignored) {
+            // This exception will happen if network speed is too low and data comes faster
+            // than we can send it to remote nodes.
+            try {
+                r.run();
+            }
+            catch (Exception e) {
+                log.warning("Failed to execute IGFS runnable: " + r, e);
+            }
+        }
+    }
+
+    /**
      * Adds manager to managers list.
      *
      * @param mgr Manager.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index d2183f9..2f704ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridPlainCallable;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -74,12 +73,9 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -123,9 +119,6 @@ public class IgfsDataManager extends IgfsManager {
     /** Affinity key generator. */
     private AtomicLong affKeyGen = new AtomicLong();
 
-    /** IGFS executor service. */
-    private ExecutorService igfsSvc;
-
     /** Request ID counter for write messages. */
     private AtomicLong reqIdCtr = new AtomicLong();
 
@@ -183,8 +176,6 @@ public class IgfsDataManager extends IgfsManager {
             }
         }, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
-        igfsSvc = igfsCtx.kernalContext().getIgfsExecutorService();
-
         delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().gridName(),
             "igfs-" + igfsName + "-delete-worker", log);
     }
@@ -345,45 +336,11 @@ public class IgfsDataManager extends IgfsManager {
 
                         if (oldRmtReadFut == null) {
                             try {
-                                if (log.isDebugEnabled())
-                                    log.debug("Reading non-local data block in the secondary
file system [path=" +
-                                        path + ", fileInfo=" + fileInfo + ", blockIdx=" +
blockIdx + ']');
-
-                                int blockSize = fileInfo.blockSize();
-
-                                long pos = blockIdx * blockSize; // Calculate position for
Hadoop
-
-                                res = new byte[blockSize];
-
-                                int read = 0;
-
-                                synchronized (secReader) {
-                                    try {
-                                        // Delegate to the secondary file system.
-                                        while (read < blockSize) {
-                                            int r = secReader.read(pos + read, res, read,
blockSize - read);
-
-                                            if (r < 0)
-                                                break;
-
-                                            read += r;
-                                        }
-                                    }
-                                    catch (IOException e) {
-                                        throw new IgniteCheckedException("Failed to read
data due to secondary file system " +
-                                            "exception: " + e.getMessage(), e);
-                                    }
-                                }
-
-                                // If we did not read full block at the end of the file -
trim it.
-                                if (read != blockSize)
-                                    res = Arrays.copyOf(res, read);
+                                res = secondaryDataBlock(path, blockIdx, secReader, fileInfo.blockSize());
 
                                 rmtReadFut.onDone(res);
 
                                 putBlock(fileInfo.blockSize(), key, res);
-
-                                igfsCtx.metrics().addReadBlocks(1, 1);
                             }
                             catch (IgniteCheckedException e) {
                                 rmtReadFut.onDone(e);
@@ -417,11 +374,59 @@ public class IgfsDataManager extends IgfsManager {
     }
 
     /**
+     * Get data block for specified block index from secondary reader.
+     *
+     * @param path Path reading from.
+     * @param blockIdx Block index.
+     * @param secReader Optional secondary file system reader.
+     * @param blockSize Block size.
+     * @return Requested data block or {@code null} if nothing found.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public byte[] secondaryDataBlock(IgfsPath path, long blockIdx,
+        IgfsSecondaryFileSystemPositionedReadable secReader, int blockSize) throws IgniteCheckedException
{
+        if (log.isDebugEnabled())
+            log.debug("Reading non-local data block in the secondary file system [path="
+
+                path + ", blockIdx=" + blockIdx + ']');
+
+        long pos = blockIdx * blockSize; // Calculate position for Hadoop
+
+        byte[] res = new byte[blockSize];
+
+        int read = 0;
+
+        try {
+            // Delegate to the secondary file system.
+            while (read < blockSize) {
+                int r = secReader.read(pos + read, res, read, blockSize - read);
+
+                if (r < 0)
+                    break;
+
+                read += r;
+            }
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to read data due to secondary file system
" +
+                "exception: " + e.getMessage(), e);
+        }
+
+        // If we did not read full block at the end of the file - trim it.
+        if (read != blockSize)
+            res = Arrays.copyOf(res, read);
+
+        igfsCtx.metrics().addReadBlocks(1, 1);
+
+        return res;
+    }
+
+    /**
      * Stores the given block in data cache.
      *
      * @param blockSize The size of the block.
      * @param key The data cache key of the block.
      * @param data The new value of the block.
+     * @throws IgniteCheckedException If failed.
      */
     private void putBlock(int blockSize, IgfsBlockKey key, byte[] data) throws IgniteCheckedException
{
         if (data.length < blockSize)
@@ -967,8 +972,8 @@ public class IgfsDataManager extends IgfsManager {
             }
         }
         else {
-            callIgfsLocalSafe(new GridPlainCallable<Object>() {
-                @Override @Nullable public Object call() throws Exception {
+            igfsCtx.runInIgfsThreadPool(new Runnable() {
+                @Override public void run() {
                     storeBlocksAsync(blocks).listen(new CI1<IgniteInternalFuture<?>>()
{
                         @Override public void apply(IgniteInternalFuture<?> fut) {
                             try {
@@ -981,8 +986,6 @@ public class IgfsDataManager extends IgfsManager {
                             }
                         }
                     });
-
-                    return null;
                 }
             });
         }
@@ -1070,28 +1073,6 @@ public class IgfsDataManager extends IgfsManager {
     }
 
     /**
-     * Executes callable in IGFS executor service. If execution rejected, callable will be
executed
-     * in caller thread.
-     *
-     * @param c Callable to execute.
-     */
-    private <T> void callIgfsLocalSafe(Callable<T> c) {
-        try {
-            igfsSvc.submit(c);
-        }
-        catch (RejectedExecutionException ignored) {
-            // This exception will happen if network speed is too low and data comes faster
-            // than we can send it to remote nodes.
-            try {
-                c.call();
-            }
-            catch (Exception e) {
-                log.warning("Failed to execute IGFS callable: " + c, e);
-            }
-        }
-    }
-
-    /**
      * @param blocks Blocks to write.
      * @return Future that will be completed after put is done.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 45596a3..87a4699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -47,6 +47,7 @@ import org.apache.ignite.igfs.IgfsPathSummary;
 import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
 import org.apache.ignite.igfs.mapreduce.IgfsTask;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
@@ -949,34 +950,79 @@ public final class IgfsImpl implements IgfsEx {
 
                 IgfsMode mode = resolveMode(path);
 
-                if (mode != PRIMARY) {
-                    assert IgfsUtils.isDualMode(mode);
+                switch (mode) {
+                    case PRIMARY: {
+                        IgfsEntryInfo info = meta.infoForPath(path);
 
-                    IgfsSecondaryInputStreamDescriptor desc = meta.openDual(secondaryFs,
path, bufSize0);
+                        if (info == null)
+                            throw new IgfsPathNotFoundException("File not found: " + path);
 
-                    IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, desc.info(),
-                        cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader());
+                        if (!info.isFile())
+                            throw new IgfsPathIsDirectoryException("Failed to open file (not
a file): " + path);
 
-                    IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+                        // Input stream to read data from grid cache with separate blocks.
+                        IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info,
+                            cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null,
+                            info.length(), info.blockSize(), info.blocksCount(), false);
 
-                    return os;
-                }
+                        IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
 
-                IgfsEntryInfo info = meta.infoForPath(path);
+                        return os;
+                    }
 
-                if (info == null)
-                    throw new IgfsPathNotFoundException("File not found: " + path);
+                    case DUAL_ASYNC:
+                    case DUAL_SYNC: {
+                        assert IgfsUtils.isDualMode(mode);
 
-                if (!info.isFile())
-                    throw new IgfsPathIsDirectoryException("Failed to open file (not a file):
" + path);
+                        IgfsSecondaryInputStreamDescriptor desc = meta.openDual(secondaryFs,
path, bufSize0);
+
+                        IgfsEntryInfo info = desc.info();
 
-                // Input stream to read data from grid cache with separate blocks.
-                IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info,
-                    cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null);
+                        IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info,
+                            cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader(),
+                            info.length(), info.blockSize(), info.blocksCount(), false);
+
+                        IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+
+                        return os;
+                    }
 
-                IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+                    case PROXY: {
+                        assert secondaryFs != null;
 
-                return os;
+                        IgfsFile info = info(path);
+
+                        if (info == null)
+                            throw new IgfsPathNotFoundException("File not found: " + path);
+
+                        if (!info.isFile())
+                            throw new IgfsPathIsDirectoryException("Failed to open file (not
a file): " + path);
+
+                        IgfsSecondaryFileSystemPositionedReadable secReader =
+                            new IgfsLazySecondaryFileSystemPositionedReadable(secondaryFs,
path, bufSize);
+
+                        long len = info.length();
+
+                        int blockSize = info.blockSize() > 0 ? info.blockSize() : cfg.getBlockSize();
+
+                        long blockCnt = len / blockSize;
+
+                        if (len % blockSize != 0)
+                            blockCnt++;
+
+                        IgfsInputStream os = new IgfsInputStreamImpl(igfsCtx, path, null,
+                            cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, secReader,
+                            info.length(), blockSize, blockCnt, true);
+
+                        IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+
+                        return os;
+                    }
+
+                    default:
+                        assert false : "Unexpected mode " + mode;
+                        return null;
+                }
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
index 2f9f2fc..0d9f2cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
@@ -28,6 +28,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadabl
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -109,21 +110,44 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements
IgfsSecondar
     /** Time consumed on reading. */
     private long time;
 
+    /** File Length. */
+    private long len;
+
+    /** Block size to read. */
+    private int blockSize;
+
+    /** Block size to read. */
+    private long blocksCnt;
+
+    /** Proxy mode. */
+    private boolean proxy;
+
     /**
      * Constructs file output stream.
-     *
-     * @param igfsCtx IGFS context.
+     *  @param igfsCtx IGFS context.
      * @param path Path to stored file.
      * @param fileInfo File info to write binary data to.
      * @param prefetchBlocks Number of blocks to prefetch.
      * @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered.
      * @param secReader Optional secondary file system reader.
+     * @param len File length.
+     * @param blockSize Block size.
+     * @param blocksCnt Blocks count.
+     * @param proxy Proxy mode flag.
      */
-    IgfsInputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int prefetchBlocks,
-        int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader)
{
+    IgfsInputStreamImpl(
+        IgfsContext igfsCtx,
+        IgfsPath path,
+        @Nullable IgfsEntryInfo fileInfo,
+        int prefetchBlocks,
+        int seqReadsBeforePrefetch,
+        @Nullable IgfsSecondaryFileSystemPositionedReadable secReader,
+        long len,
+        int blockSize,
+        long blocksCnt,
+        boolean proxy) {
         assert igfsCtx != null;
         assert path != null;
-        assert fileInfo != null;
 
         this.igfsCtx = igfsCtx;
         this.path = path;
@@ -131,6 +155,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
         this.prefetchBlocks = prefetchBlocks;
         this.seqReadsBeforePrefetch = seqReadsBeforePrefetch;
         this.secReader = secReader;
+        this.len = len;
+        this.blockSize = blockSize;
+        this.blocksCnt = blocksCnt;
+        this.proxy = proxy;
 
         log = igfsCtx.kernalContext().log(IgfsInputStream.class);
 
@@ -154,7 +182,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
 
     /** {@inheritDoc} */
     @Override public long length() {
-        return fileInfo.length();
+        return len;
     }
 
     /** {@inheritDoc} */
@@ -195,7 +223,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
 
     /** {@inheritDoc} */
     @Override public synchronized int available() throws IOException {
-        long l = fileInfo.length() - pos;
+        long l = len - pos;
 
         if (l < 0)
             return 0;
@@ -240,7 +268,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
     @SuppressWarnings("IfMayBeConditional")
     public synchronized byte[][] readChunks(long pos, int len) throws IOException {
         // Readable bytes in the file, starting from the specified position.
-        long readable = fileInfo.length() - pos;
+        long readable = this.len - pos;
 
         if (readable <= 0)
             return EMPTY_CHUNKS;
@@ -254,8 +282,8 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
 
         bytes += len;
 
-        int start = (int)(pos / fileInfo.blockSize());
-        int end = (int)((pos + len - 1) / fileInfo.blockSize());
+        int start = (int)(pos / blockSize);
+        int end = (int)((pos + len - 1) / blockSize);
 
         int chunkCnt = end - start + 1;
 
@@ -264,7 +292,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
         for (int i = 0; i < chunkCnt; i++) {
             byte[] block = blockFragmentizerSafe(start + i);
 
-            int blockOff = (int)(pos % fileInfo.blockSize());
+            int blockOff = (int)(pos % blockSize);
             int blockLen = Math.min(len, block.length - blockOff);
 
             // If whole block can be used as result, do not do array copy.
@@ -366,7 +394,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
             return 0; // Fully read done: read zero bytes correctly.
 
         // Readable bytes in the file, starting from the specified position.
-        long readable = fileInfo.length() - pos;
+        long readable = this.len - pos;
 
         if (readable <= 0)
             return -1; // EOF.
@@ -378,10 +406,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements
IgfsSecondar
 
         assert len > 0;
 
-        byte[] block = blockFragmentizerSafe(pos / fileInfo.blockSize());
+        byte[] block = blockFragmentizerSafe(pos / blockSize);
 
         // Skip bytes to expected position.
-        int blockOff = (int)(pos % fileInfo.blockSize());
+        int blockOff = (int)(pos % blockSize);
 
         len = Math.min(len, block.length - blockOff);
 
@@ -412,7 +440,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
                         ", blockIdx=" + blockIdx + ", errMsg=" + e.getMessage() + ']');
 
                 // This failure may be caused by file being fragmented.
-                if (fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty())
{
+                if (fileInfo != null && fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty())
{
                     IgfsEntryInfo newInfo = igfsCtx.meta().info(fileInfo.id());
 
                     // File was deleted.
@@ -459,7 +487,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
 
             prevBlockIdx = blockIdx;
 
-            bytesFut = dataBlock(fileInfo, blockIdx);
+            bytesFut = dataBlock(blockIdx);
 
             assert bytesFut != null;
 
@@ -470,10 +498,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements
IgfsSecondar
         if (prefetchBlocks > 0 && seqReads >= seqReadsBeforePrefetch - 1) {
             for (int i = 1; i <= prefetchBlocks; i++) {
                 // Ensure that we do not prefetch over file size.
-                if (fileInfo.blockSize() * (i + blockIdx) >= fileInfo.length())
+                if (blockSize * (i + blockIdx) >= len)
                     break;
                 else if (locCache.get(blockIdx + i) == null)
-                    addLocalCacheFuture(blockIdx + i, dataBlock(fileInfo, blockIdx + i));
+                    addLocalCacheFuture(blockIdx + i, dataBlock(blockIdx + i));
             }
         }
 
@@ -483,17 +511,17 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements
IgfsSecondar
             throw new IgfsCorruptedFileException("Failed to retrieve file's data block (corrupted
file?) " +
                 "[path=" + path + ", blockIdx=" + blockIdx + ']');
 
-        int blockSize = fileInfo.blockSize();
+        int blockSize0 = blockSize;
 
-        if (blockIdx == fileInfo.blocksCount() - 1)
-            blockSize = (int)(fileInfo.length() % blockSize);
+        if (blockIdx == blocksCnt - 1)
+            blockSize0 = (int)(len % blockSize0);
 
         // If part of the file was reserved for writing, but was not actually written.
-        if (bytes.length < blockSize)
+        if (bytes.length < blockSize0)
             throw new IOException("Inconsistent file's data block (incorrectly written?)"
+
                 " [path=" + path + ", blockIdx=" + blockIdx + ", blockSize=" + bytes.length
+
-                ", expectedBlockSize=" + blockSize + ", fileBlockSize=" + fileInfo.blockSize()
+
-                ", fileLen=" + fileInfo.length() + ']');
+                ", expectedBlockSize=" + blockSize0 + ", fileBlockSize=" + blockSize +
+                ", fileLen=" + len + ']');
 
         return bytes;
     }
@@ -538,14 +566,35 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements
IgfsSecondar
     /**
      * Get data block for specified block index.
      *
-     * @param fileInfo File info.
      * @param blockIdx Block index.
      * @return Requested data block or {@code null} if nothing found.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable protected IgniteInternalFuture<byte[]> dataBlock(IgfsEntryInfo fileInfo,
long blockIdx)
+    @Nullable protected IgniteInternalFuture<byte[]> dataBlock(final long blockIdx)
         throws IgniteCheckedException {
-        return igfsCtx.data().dataBlock(fileInfo, path, blockIdx, secReader);
+        if (proxy) {
+            assert secReader != null;
+
+            final GridFutureAdapter<byte[]> fut = new GridFutureAdapter<>();
+
+            igfsCtx.runInIgfsThreadPool(new Runnable() {
+                @Override public void run() {
+                    try {
+                        fut.onDone(igfsCtx.data().secondaryDataBlock(path, blockIdx, secReader,
blockSize));
+                    }
+                    catch (Throwable e) {
+                        fut.onDone(null, e);
+                    }
+                }
+            });
+
+            return fut;
+        }
+        else {
+            assert fileInfo != null;
+
+            return igfsCtx.data().dataBlock(fileInfo, path, blockIdx, secReader);
+        }
     }
 
     /** {@inheritDoc} */


Mime
View raw message