ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [04/57] [abbrv] incubator-ignite git commit: # IGNITE-226: WIP.
Date Fri, 13 Feb 2015 10:54:14 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsInputStreamImpl.java
new file mode 100644
index 0000000..95d3a00
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsInputStreamImpl.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.ignitefs.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Input stream to read data from grid cache with separate blocks.
+ */
+public class IgfsInputStreamImpl extends IgfsInputStreamAdapter {
+    /** Empty chunks result. */
+    private static final byte[][] EMPTY_CHUNKS = new byte[0][];
+
+    /** Meta manager. */
+    private final IgfsMetaManager meta;
+
+    /** Data manager. */
+    private final IgfsDataManager data;
+
+    /** Secondary file system reader. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private final IgniteFsReader secReader;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Path to file. */
+    protected final IgniteFsPath path;
+
+    /** File descriptor. */
+    private volatile IgfsFileInfo fileInfo;
+
+    /** The number of already read bytes. Important! Access to the property is guarded by this object lock. */
+    private long pos;
+
+    /** Local cache. */
+    private final Map<Long, IgniteInternalFuture<byte[]>> locCache;
+
+    /** Maximum local cache size. */
+    private final int maxLocCacheSize;
+
+    /** Pending data read futures which were evicted from the local cache before completion. */
+    private final Set<IgniteInternalFuture<byte[]>> pendingFuts;
+
+    /** Pending futures lock. */
+    private final Lock pendingFutsLock = new ReentrantLock();
+
+    /** Pending futures condition. */
+    private final Condition pendingFutsCond = pendingFutsLock.newCondition();
+
+    /** Closed flag. */
+    private boolean closed;
+
+    /** Number of blocks to prefetch asynchronously. */
+    private int prefetchBlocks;
+
+    /** Numbed of blocks that must be read sequentially before prefetch is triggered. */
+    private int seqReadsBeforePrefetch;
+
+    /** Bytes read. */
+    private long bytes;
+
+    /** Index of the previously read block. Initially it is set to -1 indicating that no reads has been made so far. */
+    private long prevBlockIdx = -1;
+
+    /** Amount of sequential reads performed. */
+    private int seqReads;
+
+    /** Time consumed on reading. */
+    private long time;
+
+    /** Local GGFs metrics. */
+    private final IgfsLocalMetrics metrics;
+
+    /**
+     * Constructs file output stream.
+     *
+     * @param ggfsCtx GGFS context.
+     * @param path Path to stored file.
+     * @param fileInfo File info to write binary data to.
+     * @param prefetchBlocks Number of blocks to prefetch.
+     * @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered.
+     * @param secReader Optional secondary file system reader.
+     * @param metrics Local GGFS metrics.
+     */
+    IgfsInputStreamImpl(IgfsContext ggfsCtx, IgniteFsPath path, IgfsFileInfo fileInfo, int prefetchBlocks,
+        int seqReadsBeforePrefetch, @Nullable IgniteFsReader secReader, IgfsLocalMetrics metrics) {
+        assert ggfsCtx != null;
+        assert path != null;
+        assert fileInfo != null;
+        assert metrics != null;
+
+        this.path = path;
+        this.fileInfo = fileInfo;
+        this.prefetchBlocks = prefetchBlocks;
+        this.seqReadsBeforePrefetch = seqReadsBeforePrefetch;
+        this.secReader = secReader;
+        this.metrics = metrics;
+
+        meta = ggfsCtx.meta();
+        data = ggfsCtx.data();
+
+        log = ggfsCtx.kernalContext().log(IgniteFsInputStream.class);
+
+        maxLocCacheSize = (prefetchBlocks > 0 ? prefetchBlocks : 1) * 3 / 2;
+
+        locCache = new LinkedHashMap<>(maxLocCacheSize, 1.0f);
+
+        pendingFuts = new GridConcurrentHashSet<>(prefetchBlocks > 0 ? prefetchBlocks : 1);
+    }
+
+    /**
+     * Gets bytes read.
+     *
+     * @return Bytes read.
+     */
+    public synchronized long bytes() {
+        return bytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFileInfo fileInfo() {
+        return fileInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read() throws IOException {
+        byte[] buf = new byte[1];
+
+        int read = read(buf, 0, 1);
+
+        if (read == -1)
+            return -1; // EOF.
+
+        return buf[0] & 0xFF; // Cast to int and cut to *unsigned* byte value.
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException {
+        int read = readFromStore(pos, b, off, len);
+
+        if (read != -1)
+            pos += read;
+
+        return read;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void seek(long pos) throws IOException {
+        if (pos < 0)
+            throw new IOException("Seek position cannot be negative: " + pos);
+
+
+        this.pos = pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized long position() throws IOException {
+        return pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int available() throws IOException {
+        long l = fileInfo.length() - pos;
+
+        if (l < 0)
+            return 0;
+
+        if (l > Integer.MAX_VALUE)
+            return Integer.MAX_VALUE;
+
+        return (int)l;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void readFully(long pos, byte[] buf) throws IOException {
+        readFully(pos, buf, 0, buf.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException {
+        for (int readBytes = 0; readBytes < len; ) {
+            int read = readFromStore(pos + readBytes, buf, off + readBytes, len - readBytes);
+
+            if (read == -1)
+                throw new EOFException("Failed to read stream fully (stream ends unexpectedly)" +
+                    "[pos=" + pos + ", buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']');
+
+            readBytes += read;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
+        return readFromStore(pos, buf, off, len);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("IfMayBeConditional")
+    @Override public synchronized byte[][] readChunks(long pos, int len) throws IOException {
+        // Readable bytes in the file, starting from the specified position.
+        long readable = fileInfo.length() - pos;
+
+        if (readable <= 0)
+            return EMPTY_CHUNKS;
+
+        long startTime = System.nanoTime();
+
+        if (readable < len)
+            len = (int)readable; // Truncate expected length to available.
+
+        assert len > 0;
+
+        bytes += len;
+
+        int start = (int)(pos / fileInfo.blockSize());
+        int end = (int)((pos + len - 1) / fileInfo.blockSize());
+
+        int chunkCnt = end - start + 1;
+
+        byte[][] chunks = new byte[chunkCnt][];
+
+        for (int i = 0; i < chunkCnt; i++) {
+            byte[] block = blockFragmentizerSafe(start + i);
+
+            int blockOff = (int)(pos % fileInfo.blockSize());
+            int blockLen = Math.min(len, block.length - blockOff);
+
+            // If whole block can be used as result, do not do array copy.
+            if (blockLen == block.length)
+                chunks[i] = block;
+            else {
+                // Only first or last block can have non-full data.
+                assert i == 0 || i == chunkCnt - 1;
+
+                chunks[i] = Arrays.copyOfRange(block, blockOff, blockOff + blockLen);
+            }
+
+            len -= blockLen;
+            pos += blockLen;
+        }
+
+        assert len == 0;
+
+        time += System.nanoTime() - startTime;
+
+        return chunks;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() throws IOException {
+        try {
+            if (secReader != null) {
+                // Close secondary input stream.
+                secReader.close();
+
+                // Ensuring local cache futures completion.
+                for (IgniteInternalFuture<byte[]> fut : locCache.values()) {
+                    try {
+                        fut.get();
+                    }
+                    catch (IgniteCheckedException ignore) {
+                        // No-op.
+                    }
+                }
+
+                // Ensuring pending evicted futures completion.
+                while (!pendingFuts.isEmpty()) {
+                    pendingFutsLock.lock();
+
+                    try {
+                        pendingFutsCond.await(100, TimeUnit.MILLISECONDS);
+                    }
+                    catch (InterruptedException ignore) {
+                        // No-op.
+                    }
+                    finally {
+                        pendingFutsLock.unlock();
+                    }
+                }
+
+                // Safety to ensure no orphaned data blocks exist in case file was concurrently deleted.
+               if (!meta.exists(fileInfo.id()))
+                    data.delete(fileInfo);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOError(e); // Something unrecoverable.
+        }
+        finally {
+            closed = true;
+
+            metrics.addReadBytesTime(bytes, time);
+
+            locCache.clear();
+        }
+    }
+
+    /**
+     * @param pos Position to start reading from.
+     * @param buf Data buffer to save read data to.
+     * @param off Offset in the buffer to write data from.
+     * @param len Length of the data to read from the stream.
+     * @return Number of actually read bytes.
+     * @throws IOException In case of any IO exception.
+     */
+    private int readFromStore(long pos, byte[] buf, int off, int len) throws IOException {
+        if (pos < 0)
+            throw new IllegalArgumentException("Read position cannot be negative: " + pos);
+
+        if (buf == null)
+            throw new NullPointerException("Destination buffer cannot be null.");
+
+        if (off < 0 || len < 0 || buf.length < len + off)
+            throw new IndexOutOfBoundsException("Invalid buffer boundaries " +
+                "[buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']');
+
+        if (len == 0)
+            return 0; // Fully read done: read zero bytes correctly.
+
+        // Readable bytes in the file, starting from the specified position.
+        long readable = fileInfo.length() - pos;
+
+        if (readable <= 0)
+            return -1; // EOF.
+
+        long startTime = System.nanoTime();
+
+        if (readable < len)
+            len = (int)readable; // Truncate expected length to available.
+
+        assert len > 0;
+
+        byte[] block = blockFragmentizerSafe(pos / fileInfo.blockSize());
+
+        // Skip bytes to expected position.
+        int blockOff = (int)(pos % fileInfo.blockSize());
+
+        len = Math.min(len, block.length - blockOff);
+
+        U.arrayCopy(block, blockOff, buf, off, len);
+
+        bytes += len;
+        time += System.nanoTime() - startTime;
+
+        return len;
+    }
+
+    /**
+     * Method to safely retrieve file block. In case if file block is missing this method will check file map
+     * and update file info. This may be needed when file that we are reading is concurrently fragmented.
+     *
+     * @param blockIdx Block index to read.
+     * @return Block data.
+     * @throws IOException If read failed.
+     */
+    private byte[] blockFragmentizerSafe(long blockIdx) throws IOException {
+        try {
+            try {
+                return block(blockIdx);
+            }
+            catch (IgniteFsCorruptedFileException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to fetch file block [path=" + path + ", fileInfo=" + fileInfo +
+                        ", blockIdx=" + blockIdx + ", errMsg=" + e.getMessage() + ']');
+
+                // This failure may be caused by file being fragmented.
+                if (fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) {
+                    IgfsFileInfo newInfo = meta.info(fileInfo.id());
+
+                    // File was deleted.
+                    if (newInfo == null)
+                        throw new IgniteFsFileNotFoundException("Failed to read file block (file was concurrently " +
+                                "deleted) [path=" + path + ", blockIdx=" + blockIdx + ']');
+
+                    fileInfo = newInfo;
+
+                    // Must clear cache as it may have failed futures.
+                    locCache.clear();
+
+                    if (log.isDebugEnabled())
+                        log.debug("Updated input stream file info after block fetch failure [path=" + path
+                            + ", fileInfo=" + fileInfo + ']');
+
+                    return block(blockIdx);
+                }
+
+                throw new IOException(e.getMessage(), e);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * @param blockIdx Block index.
+     * @return File block data.
+     * @throws IOException If failed.
+     * @throws IgniteCheckedException If failed.
+     */
+    private byte[] block(long blockIdx) throws IOException, IgniteCheckedException {
+        assert blockIdx >= 0;
+
+        IgniteInternalFuture<byte[]> bytesFut = locCache.get(blockIdx);
+
+        if (bytesFut == null) {
+            if (closed)
+                throw new IOException("Stream is already closed: " + this);
+
+            seqReads = (prevBlockIdx != -1 && prevBlockIdx + 1 == blockIdx) ? ++seqReads : 0;
+
+            prevBlockIdx = blockIdx;
+
+            bytesFut = dataBlock(fileInfo, blockIdx);
+
+            assert bytesFut != null;
+
+            addLocalCacheFuture(blockIdx, bytesFut);
+        }
+
+        // Schedule the next block(s) prefetch.
+        if (prefetchBlocks > 0 && seqReads >= seqReadsBeforePrefetch - 1) {
+            for (int i = 1; i <= prefetchBlocks; i++) {
+                // Ensure that we do not prefetch over file size.
+                if (fileInfo.blockSize() * (i + blockIdx) >= fileInfo.length())
+                    break;
+                else if (locCache.get(blockIdx + i) == null)
+                    addLocalCacheFuture(blockIdx + i, dataBlock(fileInfo, blockIdx + i));
+            }
+        }
+
+        byte[] bytes = bytesFut.get();
+
+        if (bytes == null)
+            throw new IgniteFsCorruptedFileException("Failed to retrieve file's data block (corrupted file?) " +
+                "[path=" + path + ", blockIdx=" + blockIdx + ']');
+
+        int blockSize = fileInfo.blockSize();
+
+        if (blockIdx == fileInfo.blocksCount() - 1)
+            blockSize = (int)(fileInfo.length() % blockSize);
+
+        // If part of the file was reserved for writing, but was not actually written.
+        if (bytes.length < blockSize)
+            throw new IOException("Inconsistent file's data block (incorrectly written?)" +
+                " [path=" + path + ", blockIdx=" + blockIdx + ", blockSize=" + bytes.length +
+                ", expectedBlockSize=" + blockSize + ", fileBlockSize=" + fileInfo.blockSize() +
+                ", fileLen=" + fileInfo.length() + ']');
+
+        return bytes;
+    }
+
+    /**
+     * Add local cache future.
+     *
+     * @param idx Block index.
+     * @param fut Future.
+     */
+    private void addLocalCacheFuture(long idx, IgniteInternalFuture<byte[]> fut) {
+        assert Thread.holdsLock(this);
+
+        if (!locCache.containsKey(idx)) {
+            if (locCache.size() == maxLocCacheSize) {
+                final IgniteInternalFuture<byte[]> evictFut = locCache.remove(locCache.keySet().iterator().next());
+
+                if (!evictFut.isDone()) {
+                    pendingFuts.add(evictFut);
+
+                    evictFut.listenAsync(new IgniteInClosure<IgniteInternalFuture<byte[]>>() {
+                        @Override public void apply(IgniteInternalFuture<byte[]> t) {
+                            pendingFuts.remove(evictFut);
+
+                            pendingFutsLock.lock();
+
+                            try {
+                                pendingFutsCond.signalAll();
+                            }
+                            finally {
+                                pendingFutsLock.unlock();
+                            }
+                        }
+                    });
+                }
+            }
+
+            locCache.put(idx, fut);
+        }
+    }
+
+    /**
+     * Get data block for specified block index.
+     *
+     * @param fileInfo File info.
+     * @param blockIdx Block index.
+     * @return Requested data block or {@code null} if nothing found.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable protected IgniteInternalFuture<byte[]> dataBlock(IgfsFileInfo fileInfo, long blockIdx) throws IgniteCheckedException {
+        return data.dataBlock(fileInfo, path, blockIdx, secReader);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsInputStreamImpl.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsInvalidRangeException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsInvalidRangeException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsInvalidRangeException.java
new file mode 100644
index 0000000..edb73a5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsInvalidRangeException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.*;
+
+/**
+ * Internal exception thrown when attempted to update range that is no longer present
+ * in file affinity map.
+ */
+public class IgfsInvalidRangeException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param msg Error message.
+     */
+    public IgfsInvalidRangeException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * @param cause Error cause.
+     */
+    public IgfsInvalidRangeException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsIpcHandler.java
new file mode 100644
index 0000000..76bdabe
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsIpcHandler.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.ignitefs.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.fs.common.*;
+import org.apache.ignite.internal.processors.closure.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * GGFS IPC handler.
+ */
+class IgfsIpcHandler implements IgfsServerHandler {
+    /** For test purposes only. */
+    @SuppressWarnings("UnusedDeclaration")
+    private static boolean errWrite;
+
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Log. */
+    private IgniteLogger log;
+
+    /** Buffer size. */
+    private final int bufSize; // Buffer size. Must not be less then file block size.
+
+    /** Ggfs instance for this handler. */
+    private IgfsEx ggfs;
+
+    /** Resource ID generator. */
+    private AtomicLong rsrcIdGen = new AtomicLong();
+
+    /** Stopping flag. */
+    private volatile boolean stopping;
+
+    /**
+     * Constructs GGFS IPC handler.
+     *
+     * @param ggfsCtx Context.
+     */
+    IgfsIpcHandler(IgfsContext ggfsCtx) {
+        assert ggfsCtx != null;
+
+        ctx = ggfsCtx.kernalContext();
+        ggfs = ggfsCtx.ggfs();
+
+        // Keep buffer size multiple of block size so no extra byte array copies is performed.
+        bufSize = ggfsCtx.configuration().getBlockSize() * 2;
+
+        log = ctx.log(IgfsIpcHandler.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteCheckedException {
+        stopping = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClosed(IgfsClientSession ses) {
+        Iterator<Closeable> it = ses.registeredResources();
+
+        while (it.hasNext()) {
+            Closeable stream = it.next();
+
+            try {
+                stream.close();
+            }
+            catch (IOException e) {
+                U.warn(log, "Failed to close opened stream on client close event (will continue) [ses=" + ses +
+                    ", stream=" + stream + ']', e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<GridGgfsMessage> handleAsync(final IgfsClientSession ses,
+        final GridGgfsMessage msg, DataInput in) {
+        try {
+            // Even if will be closed right after this call, response write error will be ignored.
+            if (stopping)
+                return null;
+
+            final GridGgfsIpcCommand cmd = msg.command();
+
+            IgniteInternalFuture<GridGgfsMessage> fut;
+
+            switch (cmd) {
+                // Execute not-blocking command synchronously in worker thread.
+                case WRITE_BLOCK:
+                case MAKE_DIRECTORIES:
+                case LIST_FILES:
+                case LIST_PATHS: {
+                    GridGgfsMessage res = execute(ses, cmd, msg, in);
+
+                    fut = res == null ? null : new GridFinishedFuture<>(ctx, res);
+
+                    break;
+                }
+
+                // Execute command asynchronously in user's pool.
+                default: {
+                    fut = ctx.closure().callLocalSafe(new GridPlainCallable<GridGgfsMessage>() {
+                        @Override public GridGgfsMessage call() throws Exception {
+                            // No need to pass data input for non-write-block commands.
+                            return execute(ses, cmd, msg, null);
+                        }
+                    }, GridClosurePolicy.GGFS_POOL);
+                }
+            }
+
+            // Pack result object into response format.
+            return fut;
+        }
+        catch (Exception e) {
+            return new GridFinishedFuture<>(ctx, e);
+        }
+    }
+
+    /**
+     * Execute GGFS command.
+     *
+     * @param ses Client connection session.
+     * @param cmd Command to execute.
+     * @param msg Message to process.
+     * @param in Data input in case of block write command.
+     * @return Command execution result.
+     * @throws Exception If failed.
+     */
+    private GridGgfsMessage execute(IgfsClientSession ses, GridGgfsIpcCommand cmd, GridGgfsMessage msg,
+        @Nullable DataInput in)
+        throws Exception {
+        switch (cmd) {
+            case HANDSHAKE:
+                return processHandshakeRequest((GridGgfsHandshakeRequest)msg);
+
+            case STATUS:
+                return processStatusRequest();
+
+            case EXISTS:
+            case INFO:
+            case PATH_SUMMARY:
+            case UPDATE:
+            case RENAME:
+            case DELETE:
+            case MAKE_DIRECTORIES:
+            case LIST_PATHS:
+            case LIST_FILES:
+            case SET_TIMES:
+            case AFFINITY:
+            case OPEN_READ:
+            case OPEN_CREATE:
+            case OPEN_APPEND:
+                return processPathControlRequest(ses, cmd, msg);
+
+            case CLOSE:
+            case READ_BLOCK:
+            case WRITE_BLOCK:
+                return processStreamControlRequest(ses, cmd, msg, in);
+
+            default:
+                throw new IgniteCheckedException("Unsupported IPC command: " + cmd);
+        }
+    }
+
+    /**
+     * Processes handshake request.
+     *
+     * @param req Handshake request.
+     * @return Response message.
+     * @throws IgniteCheckedException In case of handshake failure.
+     */
+    private GridGgfsMessage processHandshakeRequest(GridGgfsHandshakeRequest req) throws IgniteCheckedException {
+        if (!F.eq(ctx.gridName(), req.gridName()))
+            throw new IgniteCheckedException("Failed to perform handshake because actual Grid name differs from expected " +
+                "[expected=" + req.gridName() + ", actual=" + ctx.gridName() + ']');
+
+        if (!F.eq(ggfs.name(), req.ggfsName()))
+            throw new IgniteCheckedException("Failed to perform handshake because actual GGFS name differs from expected " +
+                "[expected=" + req.ggfsName() + ", actual=" + ggfs.name() + ']');
+
+        GridGgfsControlResponse res = new GridGgfsControlResponse();
+
+        ggfs.clientLogDirectory(req.logDirectory());
+
+        IgfsHandshakeResponse handshake = new IgfsHandshakeResponse(ggfs.name(), ggfs.proxyPaths(),
+            ggfs.groupBlockSize(), ggfs.globalSampling());
+
+        res.handshake(handshake);
+
+        return res;
+    }
+
+    /**
+     * Processes status request.
+     *
+     * @return Status response.
+     * @throws IgniteCheckedException If failed.
+     */
+    private GridGgfsMessage processStatusRequest() throws IgniteCheckedException {
+        IgfsStatus status = ggfs.globalSpace();
+
+        GridGgfsControlResponse res = new GridGgfsControlResponse();
+
+        res.status(status);
+
+        return res;
+    }
+
+    /**
+     * Processes path control request.
+     *
+     * @param ses Session.
+     * @param cmd Command.
+     * @param msg Message.
+     * @return Response message.
+     * @throws IgniteCheckedException If failed.
+     */
+    private GridGgfsMessage processPathControlRequest(IgfsClientSession ses, GridGgfsIpcCommand cmd,
+        GridGgfsMessage msg) throws IgniteCheckedException {
+        GridGgfsPathControlRequest req = (GridGgfsPathControlRequest)msg;
+
+        if (log.isDebugEnabled())
+            log.debug("Processing path control request [ggfsName=" + ggfs.name() + ", req=" + req + ']');
+
+        GridGgfsControlResponse res = new GridGgfsControlResponse();
+
+        try {
+            switch (cmd) {
+                case EXISTS:
+                    res.response(ggfs.exists(req.path()));
+
+                    break;
+
+                case INFO:
+                    res.response(ggfs.info(req.path()));
+
+                    break;
+
+                case PATH_SUMMARY:
+                    res.response(ggfs.summary(req.path()));
+
+                    break;
+
+                case UPDATE:
+                    res.response(ggfs.update(req.path(), req.properties()));
+
+                    break;
+
+                case RENAME:
+                    ggfs.rename(req.path(), req.destinationPath());
+
+                    res.response(true);
+
+                    break;
+
+                case DELETE:
+                    res.response(ggfs.delete(req.path(), req.flag()));
+
+                    break;
+
+                case MAKE_DIRECTORIES:
+                    ggfs.mkdirs(req.path(), req.properties());
+
+                    res.response(true);
+
+                    break;
+
+                case LIST_PATHS:
+                    res.paths(ggfs.listPaths(req.path()));
+
+                    break;
+
+                case LIST_FILES:
+                    res.files(ggfs.listFiles(req.path()));
+
+                    break;
+
+                case SET_TIMES:
+                    ggfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
+
+                    res.response(true);
+
+                    break;
+
+                case AFFINITY:
+                    res.locations(ggfs.affinity(req.path(), req.start(), req.length()));
+
+                    break;
+
+                case OPEN_READ: {
+                    IgfsInputStreamAdapter ggfsIn = !req.flag() ? ggfs.open(req.path(), bufSize) :
+                        ggfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
+
+                    long streamId = registerResource(ses, ggfsIn);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Opened GGFS input stream for file read [ggfsName=" + ggfs.name() + ", path=" +
+                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+
+                    IgfsFileInfo info = new IgfsFileInfo(ggfsIn.fileInfo(), null,
+                        ggfsIn.fileInfo().modificationTime());
+
+                    res.response(new IgfsInputStreamDescriptor(streamId, info.length()));
+
+                    break;
+                }
+
+                case OPEN_CREATE: {
+                    long streamId = registerResource(ses, ggfs.create(
+                        req.path(),       // Path.
+                        bufSize,          // Buffer size.
+                        req.flag(),       // Overwrite if exists.
+                        affinityKey(req), // Affinity key based on replication factor.
+                        req.replication(),// Replication factor.
+                        req.blockSize(),  // Block size.
+                        req.properties()  // File properties.
+                    ));
+
+                    if (log.isDebugEnabled())
+                        log.debug("Opened GGFS output stream for file create [ggfsName=" + ggfs.name() + ", path=" +
+                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+
+                    res.response(streamId);
+
+                    break;
+                }
+
+                case OPEN_APPEND: {
+                    long streamId = registerResource(ses, ggfs.append(
+                        req.path(),        // Path.
+                        bufSize,           // Buffer size.
+                        req.flag(),        // Create if absent.
+                        req.properties()   // File properties.
+                    ));
+
+                    if (log.isDebugEnabled())
+                        log.debug("Opened GGFS output stream for file append [ggfsName=" + ggfs.name() + ", path=" +
+                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+
+                    res.response(streamId);
+
+                    break;
+                }
+
+                default:
+                    assert false : "Unhandled path control request command: " + cmd;
+
+                    break;
+            }
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Finished processing path control request [ggfsName=" + ggfs.name() + ", req=" + req +
+                ", res=" + res + ']');
+
+        return res;
+    }
+
+    /**
+     * Processes stream control request.
+     *
+     * @param ses Session.
+     * @param cmd Command.
+     * @param msg Message.
+     * @param in Data input to read.
+     * @return Response message if needed.
+     * @throws IgniteCheckedException If failed.
+     * @throws IOException If failed.
+     */
+    private GridGgfsMessage processStreamControlRequest(IgfsClientSession ses, GridGgfsIpcCommand cmd,
+        GridGgfsMessage msg, DataInput in) throws IgniteCheckedException, IOException {
+        GridGgfsStreamControlRequest req = (GridGgfsStreamControlRequest)msg;
+
+        Long rsrcId = req.streamId();
+
+        GridGgfsControlResponse resp = new GridGgfsControlResponse();
+
+        switch (cmd) {
+            case CLOSE: {
+                Closeable res = resource(ses, rsrcId);
+
+                if (log.isDebugEnabled())
+                    log.debug("Requested to close resource [ggfsName=" + ggfs.name() + ", rsrcId=" + rsrcId +
+                        ", res=" + res + ']');
+
+                if (res == null)
+                    throw new IgniteCheckedException("Resource to close not found: " + rsrcId);
+
+                try {
+                    res.close();
+                }
+                catch (IOException e) {
+                    // Unwrap OutOfSpaceException, if has one.
+                    IgniteFsOutOfSpaceException space = X.cause(e, IgniteFsOutOfSpaceException.class);
+
+                    if (space != null)
+                        throw space;
+
+                    throw e;
+                }
+
+                boolean success = ses.unregisterResource(rsrcId, res);
+
+                assert success : "Failed to unregister resource [ggfsName=" + ggfs.name() + ", rsrcId=" + rsrcId +
+                    ", res=" + res + ']';
+
+                if (log.isDebugEnabled())
+                    log.debug("Closed GGFS stream [ggfsName=" + ggfs.name() + ", streamId=" + rsrcId +
+                        ", ses=" + ses + ']');
+
+                resp.response(true);
+
+                break;
+            }
+
+            case READ_BLOCK: {
+                long pos = req.position();
+                int size = req.length();
+
+                IgfsInputStreamAdapter ggfsIn = (IgfsInputStreamAdapter)resource(ses, rsrcId);
+
+                if (ggfsIn == null)
+                    throw new IgniteCheckedException("Input stream not found (already closed?): " + rsrcId);
+
+                byte[][] chunks = ggfsIn.readChunks(pos, size);
+
+                resp.response(chunks);
+
+                // Calculate number of read bytes.
+                // len = len(first) + (n - 2) * len(block) + len(last).
+                int len = 0;
+
+                if (chunks.length > 0)
+                    len += chunks[0].length;
+
+                if (chunks.length > 1)
+                    len += chunks[chunks.length - 1].length;
+
+                if (chunks.length > 2)
+                    len += chunks[1].length * (chunks.length - 2);
+
+                resp.length(len);
+
+                break;
+            }
+
+            case WRITE_BLOCK: {
+                assert rsrcId != null : "Missing stream ID";
+
+                IgniteFsOutputStream out = (IgniteFsOutputStream)resource(ses, rsrcId);
+
+                if (out == null)
+                    throw new IgniteCheckedException("Output stream not found (already closed?): " + rsrcId);
+
+                int writeLen = req.length();
+
+                try {
+                    out.transferFrom(in, writeLen);
+
+                    if (errWrite)
+                        throw new IOException("Failed to write data to server (test).");
+
+                    // No response needed.
+                    return null;
+                }
+                catch (IOException e) {
+                    resp.error(rsrcId, e.getMessage());
+
+                    break;
+                }
+            }
+
+            default:
+                assert false;
+
+                break;
+        }
+
+        return resp;
+    }
+
+    /**
+     * @param req Path control request.
+     * @return Affinity key that maps on local node by the time this method is called if replication factor
+     *      is {@code 0}, {@code null} otherwise.
+     */
+    @Nullable private IgniteUuid affinityKey(GridGgfsPathControlRequest req) {
+        // Do not generate affinity key for replicated or near-only cache.
+        if (!req.colocate()) {
+            if (log.isDebugEnabled())
+                log.debug("Will not generate affinity key for path control request [ggfsName=" + ggfs.name() +
+                    ", req=" + req + ']');
+
+            return null;
+        }
+
+        IgniteUuid key = ggfs.nextAffinityKey();
+
+        if (log.isDebugEnabled())
+            log.debug("Generated affinity key for path control request [ggfsName=" + ggfs.name() +
+                ", req=" + req + ", key=" + key + ']');
+
+        return key;
+    }
+
+    /**
+     * Registers closeable resource within client session.
+     *
+     * @param ses IPC session.
+     * @param rsrc Resource to register.
+     * @return Registration resource ID.
+     */
+    private long registerResource(IgfsClientSession ses, Closeable rsrc) {
+        long rsrcId = rsrcIdGen.getAndIncrement();
+
+        boolean registered = ses.registerResource(rsrcId, rsrc);
+
+        assert registered : "Failed to register resource (duplicate id?): " + rsrcId;
+
+        return rsrcId;
+    }
+
+    /**
+     * Gets resource by resource ID from client session.
+     *
+     * @param ses Session to get resource from.
+     * @param rsrcId Resource ID.
+     * @return Registered resource or {@code null} if not found.
+     */
+    @Nullable private Closeable resource(IgfsClientSession ses, Long rsrcId) {
+        return ses.resource(rsrcId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsJobImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsJobImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsJobImpl.java
new file mode 100644
index 0000000..ebd37a2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsJobImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.ignitefs.*;
+import org.apache.ignite.ignitefs.mapreduce.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.resources.*;
+
+import java.io.*;
+
+/**
+ * GGFS job implementation.
+ */
+public class IgfsJobImpl implements ComputeJob, GridInternalWrapper<IgniteFsJob> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** GGFS job. */
+    private IgniteFsJob job;
+
+    /** GGFS name. */
+    private String ggfsName;
+
+    /** GGFS path. */
+    private IgniteFsPath path;
+
+    /** Start. */
+    private long start;
+
+    /** Length. */
+    private long len;
+
+    /** Split resolver. */
+    private IgniteFsRecordResolver rslvr;
+
+    /** Injected grid. */
+    @IgniteInstanceResource
+    private Ignite ignite;
+
+    /** Injected logger. */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /**
+     * @param job GGFS job.
+     * @param ggfsName GGFS name.
+     * @param path Split path.
+     * @param start Split start offset.
+     * @param len Split length.
+     * @param rslvr GGFS split resolver.
+     */
+    public IgfsJobImpl(IgniteFsJob job, String ggfsName, IgniteFsPath path, long start, long len,
+        IgniteFsRecordResolver rslvr) {
+        this.job = job;
+        this.ggfsName = ggfsName;
+        this.path = path;
+        this.start = start;
+        this.len = len;
+        this.rslvr = rslvr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object execute() {
+        IgniteFs fs = ignite.fileSystem(ggfsName);
+
+        try (IgniteFsInputStream in = fs.open(path)) {
+            IgniteFsFileRange split = new IgniteFsFileRange(path, start, len);
+
+            if (rslvr != null) {
+                split = rslvr.resolveRecords(fs, in, split);
+
+                if (split == null) {
+                    log.warning("No data found for split on local node after resolver is applied " +
+                        "[ggfsName=" + ggfsName + ", path=" + path + ", start=" + start + ", len=" + len + ']');
+
+                    return null;
+                }
+            }
+
+            in.seek(split.start());
+
+            return job.execute(fs, new IgniteFsFileRange(path, split.start(), split.length()), in);
+        }
+        catch (IOException e) {
+            throw new IgniteException("Failed to execute GGFS job for file split [ggfsName=" + ggfsName +
+                ", path=" + path + ", start=" + start + ", len=" + len + ']', e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        job.cancel();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFsJob userObject() {
+        return job;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsListingEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsListingEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsListingEntry.java
new file mode 100644
index 0000000..795099b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsListingEntry.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Directory listing entry.
+ */
+public class IgfsListingEntry implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** File id. */
+    private IgniteUuid fileId;
+
+    /** File affinity key. */
+    private IgniteUuid affKey;
+
+    /** Positive block size if file, 0 if directory. */
+    private int blockSize;
+
+    /** File length. */
+    private long len;
+
+    /** Last access time. */
+    private long accessTime;
+
+    /** Last modification time. */
+    private long modificationTime;
+
+    /** File properties. */
+    private Map<String, String> props;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public IgfsListingEntry() {
+        // No-op.
+    }
+
+    /**
+     * @param fileInfo File info to construct listing entry from.
+     */
+    public IgfsListingEntry(IgfsFileInfo fileInfo) {
+        fileId = fileInfo.id();
+        affKey = fileInfo.affinityKey();
+
+        if (fileInfo.isFile()) {
+            blockSize = fileInfo.blockSize();
+            len = fileInfo.length();
+        }
+
+        props = fileInfo.properties();
+        accessTime = fileInfo.accessTime();
+        modificationTime = fileInfo.modificationTime();
+    }
+
+    /**
+     * Creates listing entry with updated length.
+     *
+     * @param entry Entry.
+     * @param len New length.
+     */
+    public IgfsListingEntry(IgfsListingEntry entry, long len, long accessTime, long modificationTime) {
+        fileId = entry.fileId;
+        affKey = entry.affKey;
+        blockSize = entry.blockSize;
+        props = entry.props;
+        this.accessTime = accessTime;
+        this.modificationTime = modificationTime;
+
+        this.len = len;
+    }
+
+    /**
+     * @return Entry file ID.
+     */
+    public IgniteUuid fileId() {
+        return fileId;
+    }
+
+    /**
+     * @return File affinity key, if specified.
+     */
+    public IgniteUuid affinityKey() {
+        return affKey;
+    }
+
+    /**
+     * @return {@code True} if entry represents file.
+     */
+    public boolean isFile() {
+        return blockSize > 0;
+    }
+
+    /**
+     * @return {@code True} if entry represents directory.
+     */
+    public boolean isDirectory() {
+        return blockSize == 0;
+    }
+
+    /**
+     * @return Block size.
+     */
+    public int blockSize() {
+        return blockSize;
+    }
+
+    /**
+     * @return Length.
+     */
+    public long length() {
+        return len;
+    }
+
+    /**
+     * @return Last access time.
+     */
+    public long accessTime() {
+        return accessTime;
+    }
+
+    /**
+     * @return Last modification time.
+     */
+    public long modificationTime() {
+        return modificationTime;
+    }
+
+    /**
+     * @return Properties map.
+     */
+    public Map<String, String> properties() {
+        return props;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeGridUuid(out, fileId);
+        out.writeInt(blockSize);
+        out.writeLong(len);
+        U.writeStringMap(out, props);
+        out.writeLong(accessTime);
+        out.writeLong(modificationTime);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        fileId = U.readGridUuid(in);
+        blockSize = in.readInt();
+        len = in.readLong();
+        props = U.readStringMap(in);
+        accessTime = in.readLong();
+        modificationTime = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof IgfsListingEntry)) return false;
+
+        IgfsListingEntry that = (IgfsListingEntry)o;
+
+        return fileId.equals(that.fileId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return fileId.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsListingEntry.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsLocalMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsLocalMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsLocalMetrics.java
new file mode 100644
index 0000000..8396708
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsLocalMetrics.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.jdk8.backport.*;
+
+/**
+ * Value object holding all local GGFS metrics which cannot be determined using file system traversal.
+ */
+public class IgfsLocalMetrics {
+    /** Block reads. First value - total reads, second value - reads delegated to the secondary file system. */
+    private volatile IgniteBiTuple<LongAdder, LongAdder> blocksRead;
+
+    /** Block writes. First value - total writes, second value - writes delegated to the secondary file system. */
+    private volatile IgniteBiTuple<LongAdder, LongAdder> blocksWritten;
+
+    /** Byte reads. First value - total bytes read, second value - consumed time. */
+    private volatile IgniteBiTuple<LongAdder, LongAdder> bytesRead;
+
+    /** Byte writes. First value - total bytes written, second value - consumed time. */
+    private volatile IgniteBiTuple<LongAdder, LongAdder> bytesWritten;
+
+    /** Number of files opened for read. */
+    private final LongAdder filesOpenedForRead = new LongAdder();
+
+    /** Number of files opened for write. */
+    private final LongAdder filesOpenedForWrite = new LongAdder();
+
+    /**
+     * Constructor.
+     */
+    IgfsLocalMetrics() {
+        reset();
+    }
+
+    /**
+     * @return Read bytes.
+     */
+    long readBytes() {
+        return bytesRead.get1().longValue();
+    }
+
+    /**
+     * @return Read bytes time.
+     */
+    long readBytesTime() {
+        return bytesRead.get2().longValue();
+    }
+
+    /**
+     * Adds given numbers to read bytes and read time.
+     *
+     * @param readBytes Number of bytes read.
+     * @param readTime Read time.
+     */
+    void addReadBytesTime(long readBytes, long readTime) {
+        IgniteBiTuple<LongAdder, LongAdder> bytesRead0 = bytesRead;
+
+        bytesRead0.get1().add(readBytes);
+        bytesRead0.get2().add(readTime);
+    }
+
+    /**
+     * @return Written bytes.
+     */
+    long writeBytes() {
+        return bytesWritten.get1().longValue();
+    }
+
+    /**
+     * @return Write bytes time.
+     */
+    long writeBytesTime() {
+        return bytesWritten.get2().longValue();
+    }
+
+    /**
+     * Adds given numbers to written bytes and write time.
+     *
+     * @param writtenBytes Number of bytes written.
+     * @param writeTime Write time.
+     */
+    void addWrittenBytesTime(long writtenBytes, long writeTime) {
+        IgniteBiTuple<LongAdder, LongAdder> bytesWritten0 = bytesWritten;
+
+        bytesWritten0.get1().add(writtenBytes);
+        bytesWritten0.get2().add(writeTime);
+    }
+
+    /**
+     * @return Read blocks.
+     */
+    long readBlocks() {
+        return blocksRead.get1().longValue();
+    }
+
+    /**
+     * @return Written blocks to secondary file system.
+     */
+    long readBlocksSecondary() {
+        return blocksRead.get2().longValue();
+    }
+
+    /**
+     * Adds given numbers to read blocks counters.
+     *
+     * @param total Total number of blocks read.
+     * @param secondary Number of blocks read form secondary FS.
+     */
+    void addReadBlocks(int total, int secondary) {
+        IgniteBiTuple<LongAdder, LongAdder> blocksRead0 = blocksRead;
+
+        blocksRead0.get1().add(total);
+        blocksRead0.get2().add(secondary);
+    }
+
+    /**
+     * @return Written blocks.
+     */
+    long writeBlocks() {
+        return blocksWritten.get1().longValue();
+    }
+
+    /**
+     * @return Written blocks to secondary file system.
+     */
+    long writeBlocksSecondary() {
+        return blocksWritten.get2().longValue();
+    }
+
+    /**
+     * Adds given numbers to write blocks counters.
+     *
+     * @param total Total number of block written.
+     * @param secondary Number of blocks written to secondary FS.
+     */
+    void addWriteBlocks(int total, int secondary) {
+        IgniteBiTuple<LongAdder, LongAdder> blocksWritten0 = blocksWritten;
+
+        blocksWritten0.get1().add(total);
+        blocksWritten0.get2().add(secondary);
+    }
+
+    /**
+     * Increment files opened for read.
+     */
+    void incrementFilesOpenedForRead() {
+        filesOpenedForRead.increment();
+    }
+
+    /**
+     * Decrement files opened for read.
+     */
+    void decrementFilesOpenedForRead() {
+        filesOpenedForRead.decrement();
+    }
+
+    /**
+     * @return Files opened for read.
+     */
+    int filesOpenedForRead() {
+        return filesOpenedForRead.intValue();
+    }
+
+    /**
+     * Increment files opened for write.
+     */
+    void incrementFilesOpenedForWrite() {
+        filesOpenedForWrite.increment();
+    }
+
+    /**
+     * Decrement files opened for write.
+     */
+    void decrementFilesOpenedForWrite() {
+        filesOpenedForWrite.decrement();
+    }
+
+    /**
+     * @return Files opened for write.
+     */
+    int filesOpenedForWrite() {
+        return filesOpenedForWrite.intValue();
+    }
+
+    /**
+     * Reset summary  counters.
+     */
+    void reset() {
+        blocksRead = F.t(new LongAdder(), new LongAdder());
+        blocksWritten = F.t(new LongAdder(), new LongAdder());
+        bytesRead = F.t(new LongAdder(), new LongAdder());
+        bytesWritten = F.t(new LongAdder(), new LongAdder());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsManager.java
new file mode 100644
index 0000000..21ec0cc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsManager.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.fs;
+
+import org.apache.ignite.*;
+
+import java.util.concurrent.atomic.*;
+
+/**
+ * Abstract class for GGFS managers.
+ */
+public abstract class IgfsManager {
+    /** GGFS context. */
+    protected IgfsContext ggfsCtx;
+
+    /** Logger. */
+    protected IgniteLogger log;
+
+    /** Starting flag. */
+    private AtomicBoolean starting = new AtomicBoolean();
+
+    /**
+     * Called when GGFS processor is started.
+     *
+     * @param ggfsCtx GGFS context.
+     */
+    public void start(IgfsContext ggfsCtx) throws IgniteCheckedException {
+        if (!starting.compareAndSet(false, true))
+            assert false : "Method start is called more than once for manager: " + this;
+
+        assert ggfsCtx != null;
+
+        this.ggfsCtx = ggfsCtx;
+
+        log = ggfsCtx.kernalContext().log(getClass());
+
+        start0();
+
+        if (log != null && log.isDebugEnabled())
+            log.debug(startInfo());
+    }
+
+    /**
+     * Stops manager.
+     *
+     * @param cancel Cancel flag.
+     */
+    public final void stop(boolean cancel) {
+        if (!starting.get())
+            // Ignoring attempt to stop manager that has never been started.
+            return;
+
+        stop0(cancel);
+
+        if (log != null && log.isDebugEnabled())
+            log.debug(stopInfo());
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public final void onKernalStart() throws IgniteCheckedException {
+        onKernalStart0();
+
+        if (log != null && log.isDebugEnabled())
+            log.debug(kernalStartInfo());
+    }
+
+    /**
+     * @param cancel Cancel flag.
+     */
+    public final void onKernalStop(boolean cancel) {
+        if (!starting.get())
+            // Ignoring attempt to stop manager that has never been started.
+            return;
+
+        onKernalStop0(cancel);
+
+        if (log != null && log.isDebugEnabled())
+            log.debug(kernalStopInfo());
+    }
+
+    /**
+     * Start manager implementation.
+     */
+    protected void start0() throws IgniteCheckedException {
+        // No-op by default.
+    }
+
+    /**
+     * Stop manager implementation.
+     *
+     * @param cancel Cancel flag.
+     */
+    protected void stop0(boolean cancel) {
+        // No-op by default.
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    protected void onKernalStart0() throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /**
+     *
+     */
+    protected void onKernalStop0(boolean cancel) {
+        // No-op.
+    }
+
+    /**
+     * @return Start info.
+     */
+    protected String startInfo() {
+        return "Cache manager started: " + getClass().getSimpleName();
+    }
+
+    /**
+     * @return Stop info.
+     */
+    protected String stopInfo() {
+        return "Cache manager stopped: " + getClass().getSimpleName();
+    }
+
+    /**
+     * @return Start info.
+     */
+    protected String kernalStartInfo() {
+        return "Cache manager received onKernalStart() callback: " + getClass().getSimpleName();
+    }
+
+    /**
+     * @return Stop info.
+     */
+    protected String kernalStopInfo() {
+        return "Cache manager received onKernalStop() callback: " + getClass().getSimpleName();
+    }
+}


Mime
View raw message