ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [11/57] [abbrv] incubator-ignite git commit: # IGNITE-226: WIP.
Date Fri, 13 Feb 2015 10:54:21 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsInputStreamImpl.java
deleted file mode 100644
index 44fdd63..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsInputStreamImpl.java
+++ /dev/null
@@ -1,533 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.ignitefs.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
-
-/**
- * Input stream to read data from grid cache with separate blocks.
- */
-public class GridGgfsInputStreamImpl extends GridGgfsInputStreamAdapter {
-    /** Empty chunks result. */
-    private static final byte[][] EMPTY_CHUNKS = new byte[0][];
-
-    /** Meta manager. */
-    private final GridGgfsMetaManager meta;
-
-    /** Data manager. */
-    private final GridGgfsDataManager data;
-
-    /** Secondary file system reader. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private final IgniteFsReader secReader;
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Path to file. */
-    protected final IgniteFsPath path;
-
-    /** File descriptor. */
-    private volatile GridGgfsFileInfo fileInfo;
-
-    /** The number of already read bytes. Important! Access to the property is guarded by this object lock. */
-    private long pos;
-
-    /** Local cache. */
-    private final Map<Long, IgniteInternalFuture<byte[]>> locCache;
-
-    /** Maximum local cache size. */
-    private final int maxLocCacheSize;
-
-    /** Pending data read futures which were evicted from the local cache before completion. */
-    private final Set<IgniteInternalFuture<byte[]>> pendingFuts;
-
-    /** Pending futures lock. */
-    private final Lock pendingFutsLock = new ReentrantLock();
-
-    /** Pending futures condition. */
-    private final Condition pendingFutsCond = pendingFutsLock.newCondition();
-
-    /** Closed flag. */
-    private boolean closed;
-
-    /** Number of blocks to prefetch asynchronously. */
-    private int prefetchBlocks;
-
-    /** Numbed of blocks that must be read sequentially before prefetch is triggered. */
-    private int seqReadsBeforePrefetch;
-
-    /** Bytes read. */
-    private long bytes;
-
-    /** Index of the previously read block. Initially it is set to -1 indicating that no reads has been made so far. */
-    private long prevBlockIdx = -1;
-
-    /** Amount of sequential reads performed. */
-    private int seqReads;
-
-    /** Time consumed on reading. */
-    private long time;
-
-    /** Local GGFs metrics. */
-    private final GridGgfsLocalMetrics metrics;
-
-    /**
-     * Constructs file output stream.
-     *
-     * @param ggfsCtx GGFS context.
-     * @param path Path to stored file.
-     * @param fileInfo File info to write binary data to.
-     * @param prefetchBlocks Number of blocks to prefetch.
-     * @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered.
-     * @param secReader Optional secondary file system reader.
-     * @param metrics Local GGFS metrics.
-     */
-    GridGgfsInputStreamImpl(GridGgfsContext ggfsCtx, IgniteFsPath path, GridGgfsFileInfo fileInfo, int prefetchBlocks,
-        int seqReadsBeforePrefetch, @Nullable IgniteFsReader secReader, GridGgfsLocalMetrics metrics) {
-        assert ggfsCtx != null;
-        assert path != null;
-        assert fileInfo != null;
-        assert metrics != null;
-
-        this.path = path;
-        this.fileInfo = fileInfo;
-        this.prefetchBlocks = prefetchBlocks;
-        this.seqReadsBeforePrefetch = seqReadsBeforePrefetch;
-        this.secReader = secReader;
-        this.metrics = metrics;
-
-        meta = ggfsCtx.meta();
-        data = ggfsCtx.data();
-
-        log = ggfsCtx.kernalContext().log(IgniteFsInputStream.class);
-
-        maxLocCacheSize = (prefetchBlocks > 0 ? prefetchBlocks : 1) * 3 / 2;
-
-        locCache = new LinkedHashMap<>(maxLocCacheSize, 1.0f);
-
-        pendingFuts = new GridConcurrentHashSet<>(prefetchBlocks > 0 ? prefetchBlocks : 1);
-    }
-
-    /**
-     * Gets bytes read.
-     *
-     * @return Bytes read.
-     */
-    public synchronized long bytes() {
-        return bytes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridGgfsFileInfo fileInfo() {
-        return fileInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int read() throws IOException {
-        byte[] buf = new byte[1];
-
-        int read = read(buf, 0, 1);
-
-        if (read == -1)
-            return -1; // EOF.
-
-        return buf[0] & 0xFF; // Cast to int and cut to *unsigned* byte value.
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException {
-        int read = readFromStore(pos, b, off, len);
-
-        if (read != -1)
-            pos += read;
-
-        return read;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void seek(long pos) throws IOException {
-        if (pos < 0)
-            throw new IOException("Seek position cannot be negative: " + pos);
-
-
-        this.pos = pos;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized long position() throws IOException {
-        return pos;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int available() throws IOException {
-        long l = fileInfo.length() - pos;
-
-        if (l < 0)
-            return 0;
-
-        if (l > Integer.MAX_VALUE)
-            return Integer.MAX_VALUE;
-
-        return (int)l;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void readFully(long pos, byte[] buf) throws IOException {
-        readFully(pos, buf, 0, buf.length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException {
-        for (int readBytes = 0; readBytes < len; ) {
-            int read = readFromStore(pos + readBytes, buf, off + readBytes, len - readBytes);
-
-            if (read == -1)
-                throw new EOFException("Failed to read stream fully (stream ends unexpectedly)" +
-                    "[pos=" + pos + ", buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']');
-
-            readBytes += read;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
-        return readFromStore(pos, buf, off, len);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("IfMayBeConditional")
-    @Override public synchronized byte[][] readChunks(long pos, int len) throws IOException {
-        // Readable bytes in the file, starting from the specified position.
-        long readable = fileInfo.length() - pos;
-
-        if (readable <= 0)
-            return EMPTY_CHUNKS;
-
-        long startTime = System.nanoTime();
-
-        if (readable < len)
-            len = (int)readable; // Truncate expected length to available.
-
-        assert len > 0;
-
-        bytes += len;
-
-        int start = (int)(pos / fileInfo.blockSize());
-        int end = (int)((pos + len - 1) / fileInfo.blockSize());
-
-        int chunkCnt = end - start + 1;
-
-        byte[][] chunks = new byte[chunkCnt][];
-
-        for (int i = 0; i < chunkCnt; i++) {
-            byte[] block = blockFragmentizerSafe(start + i);
-
-            int blockOff = (int)(pos % fileInfo.blockSize());
-            int blockLen = Math.min(len, block.length - blockOff);
-
-            // If whole block can be used as result, do not do array copy.
-            if (blockLen == block.length)
-                chunks[i] = block;
-            else {
-                // Only first or last block can have non-full data.
-                assert i == 0 || i == chunkCnt - 1;
-
-                chunks[i] = Arrays.copyOfRange(block, blockOff, blockOff + blockLen);
-            }
-
-            len -= blockLen;
-            pos += blockLen;
-        }
-
-        assert len == 0;
-
-        time += System.nanoTime() - startTime;
-
-        return chunks;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void close() throws IOException {
-        try {
-            if (secReader != null) {
-                // Close secondary input stream.
-                secReader.close();
-
-                // Ensuring local cache futures completion.
-                for (IgniteInternalFuture<byte[]> fut : locCache.values()) {
-                    try {
-                        fut.get();
-                    }
-                    catch (IgniteCheckedException ignore) {
-                        // No-op.
-                    }
-                }
-
-                // Ensuring pending evicted futures completion.
-                while (!pendingFuts.isEmpty()) {
-                    pendingFutsLock.lock();
-
-                    try {
-                        pendingFutsCond.await(100, TimeUnit.MILLISECONDS);
-                    }
-                    catch (InterruptedException ignore) {
-                        // No-op.
-                    }
-                    finally {
-                        pendingFutsLock.unlock();
-                    }
-                }
-
-                // Safety to ensure no orphaned data blocks exist in case file was concurrently deleted.
-               if (!meta.exists(fileInfo.id()))
-                    data.delete(fileInfo);
-            }
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOError(e); // Something unrecoverable.
-        }
-        finally {
-            closed = true;
-
-            metrics.addReadBytesTime(bytes, time);
-
-            locCache.clear();
-        }
-    }
-
-    /**
-     * @param pos Position to start reading from.
-     * @param buf Data buffer to save read data to.
-     * @param off Offset in the buffer to write data from.
-     * @param len Length of the data to read from the stream.
-     * @return Number of actually read bytes.
-     * @throws IOException In case of any IO exception.
-     */
-    private int readFromStore(long pos, byte[] buf, int off, int len) throws IOException {
-        if (pos < 0)
-            throw new IllegalArgumentException("Read position cannot be negative: " + pos);
-
-        if (buf == null)
-            throw new NullPointerException("Destination buffer cannot be null.");
-
-        if (off < 0 || len < 0 || buf.length < len + off)
-            throw new IndexOutOfBoundsException("Invalid buffer boundaries " +
-                "[buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']');
-
-        if (len == 0)
-            return 0; // Fully read done: read zero bytes correctly.
-
-        // Readable bytes in the file, starting from the specified position.
-        long readable = fileInfo.length() - pos;
-
-        if (readable <= 0)
-            return -1; // EOF.
-
-        long startTime = System.nanoTime();
-
-        if (readable < len)
-            len = (int)readable; // Truncate expected length to available.
-
-        assert len > 0;
-
-        byte[] block = blockFragmentizerSafe(pos / fileInfo.blockSize());
-
-        // Skip bytes to expected position.
-        int blockOff = (int)(pos % fileInfo.blockSize());
-
-        len = Math.min(len, block.length - blockOff);
-
-        U.arrayCopy(block, blockOff, buf, off, len);
-
-        bytes += len;
-        time += System.nanoTime() - startTime;
-
-        return len;
-    }
-
-    /**
-     * Method to safely retrieve file block. In case if file block is missing this method will check file map
-     * and update file info. This may be needed when file that we are reading is concurrently fragmented.
-     *
-     * @param blockIdx Block index to read.
-     * @return Block data.
-     * @throws IOException If read failed.
-     */
-    private byte[] blockFragmentizerSafe(long blockIdx) throws IOException {
-        try {
-            try {
-                return block(blockIdx);
-            }
-            catch (IgniteFsCorruptedFileException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to fetch file block [path=" + path + ", fileInfo=" + fileInfo +
-                        ", blockIdx=" + blockIdx + ", errMsg=" + e.getMessage() + ']');
-
-                // This failure may be caused by file being fragmented.
-                if (fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) {
-                    GridGgfsFileInfo newInfo = meta.info(fileInfo.id());
-
-                    // File was deleted.
-                    if (newInfo == null)
-                        throw new IgniteFsFileNotFoundException("Failed to read file block (file was concurrently " +
-                                "deleted) [path=" + path + ", blockIdx=" + blockIdx + ']');
-
-                    fileInfo = newInfo;
-
-                    // Must clear cache as it may have failed futures.
-                    locCache.clear();
-
-                    if (log.isDebugEnabled())
-                        log.debug("Updated input stream file info after block fetch failure [path=" + path
-                            + ", fileInfo=" + fileInfo + ']');
-
-                    return block(blockIdx);
-                }
-
-                throw new IOException(e.getMessage(), e);
-            }
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * @param blockIdx Block index.
-     * @return File block data.
-     * @throws IOException If failed.
-     * @throws IgniteCheckedException If failed.
-     */
-    private byte[] block(long blockIdx) throws IOException, IgniteCheckedException {
-        assert blockIdx >= 0;
-
-        IgniteInternalFuture<byte[]> bytesFut = locCache.get(blockIdx);
-
-        if (bytesFut == null) {
-            if (closed)
-                throw new IOException("Stream is already closed: " + this);
-
-            seqReads = (prevBlockIdx != -1 && prevBlockIdx + 1 == blockIdx) ? ++seqReads : 0;
-
-            prevBlockIdx = blockIdx;
-
-            bytesFut = dataBlock(fileInfo, blockIdx);
-
-            assert bytesFut != null;
-
-            addLocalCacheFuture(blockIdx, bytesFut);
-        }
-
-        // Schedule the next block(s) prefetch.
-        if (prefetchBlocks > 0 && seqReads >= seqReadsBeforePrefetch - 1) {
-            for (int i = 1; i <= prefetchBlocks; i++) {
-                // Ensure that we do not prefetch over file size.
-                if (fileInfo.blockSize() * (i + blockIdx) >= fileInfo.length())
-                    break;
-                else if (locCache.get(blockIdx + i) == null)
-                    addLocalCacheFuture(blockIdx + i, dataBlock(fileInfo, blockIdx + i));
-            }
-        }
-
-        byte[] bytes = bytesFut.get();
-
-        if (bytes == null)
-            throw new IgniteFsCorruptedFileException("Failed to retrieve file's data block (corrupted file?) " +
-                "[path=" + path + ", blockIdx=" + blockIdx + ']');
-
-        int blockSize = fileInfo.blockSize();
-
-        if (blockIdx == fileInfo.blocksCount() - 1)
-            blockSize = (int)(fileInfo.length() % blockSize);
-
-        // If part of the file was reserved for writing, but was not actually written.
-        if (bytes.length < blockSize)
-            throw new IOException("Inconsistent file's data block (incorrectly written?)" +
-                " [path=" + path + ", blockIdx=" + blockIdx + ", blockSize=" + bytes.length +
-                ", expectedBlockSize=" + blockSize + ", fileBlockSize=" + fileInfo.blockSize() +
-                ", fileLen=" + fileInfo.length() + ']');
-
-        return bytes;
-    }
-
-    /**
-     * Add local cache future.
-     *
-     * @param idx Block index.
-     * @param fut Future.
-     */
-    private void addLocalCacheFuture(long idx, IgniteInternalFuture<byte[]> fut) {
-        assert Thread.holdsLock(this);
-
-        if (!locCache.containsKey(idx)) {
-            if (locCache.size() == maxLocCacheSize) {
-                final IgniteInternalFuture<byte[]> evictFut = locCache.remove(locCache.keySet().iterator().next());
-
-                if (!evictFut.isDone()) {
-                    pendingFuts.add(evictFut);
-
-                    evictFut.listenAsync(new IgniteInClosure<IgniteInternalFuture<byte[]>>() {
-                        @Override public void apply(IgniteInternalFuture<byte[]> t) {
-                            pendingFuts.remove(evictFut);
-
-                            pendingFutsLock.lock();
-
-                            try {
-                                pendingFutsCond.signalAll();
-                            }
-                            finally {
-                                pendingFutsLock.unlock();
-                            }
-                        }
-                    });
-                }
-            }
-
-            locCache.put(idx, fut);
-        }
-    }
-
-    /**
-     * Get data block for specified block index.
-     *
-     * @param fileInfo File info.
-     * @param blockIdx Block index.
-     * @return Requested data block or {@code null} if nothing found.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable protected IgniteInternalFuture<byte[]> dataBlock(GridGgfsFileInfo fileInfo, long blockIdx) throws IgniteCheckedException {
-        return data.dataBlock(fileInfo, path, blockIdx, secReader);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridGgfsInputStreamImpl.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsInvalidRangeException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsInvalidRangeException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsInvalidRangeException.java
deleted file mode 100644
index c175890..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsInvalidRangeException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.*;
-
-/**
- * Internal exception thrown when attempted to update range that is no longer present
- * in file affinity map.
- */
-public class GridGgfsInvalidRangeException extends IgniteCheckedException {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /**
-     * @param msg Error message.
-     */
-    public GridGgfsInvalidRangeException(String msg) {
-        super(msg);
-    }
-
-    /**
-     * @param cause Error cause.
-     */
-    public GridGgfsInvalidRangeException(Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java
deleted file mode 100644
index d3f74bf..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java
+++ /dev/null
@@ -1,562 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.ignitefs.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.fs.common.*;
-import org.apache.ignite.internal.processors.closure.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * GGFS IPC handler.
- */
-class GridGgfsIpcHandler implements GridGgfsServerHandler {
-    /** For test purposes only. */
-    @SuppressWarnings("UnusedDeclaration")
-    private static boolean errWrite;
-
-    /** Kernal context. */
-    private final GridKernalContext ctx;
-
-    /** Log. */
-    private IgniteLogger log;
-
-    /** Buffer size. */
-    private final int bufSize; // Buffer size. Must not be less then file block size.
-
-    /** Ggfs instance for this handler. */
-    private GridGgfsEx ggfs;
-
-    /** Resource ID generator. */
-    private AtomicLong rsrcIdGen = new AtomicLong();
-
-    /** Stopping flag. */
-    private volatile boolean stopping;
-
-    /**
-     * Constructs GGFS IPC handler.
-     *
-     * @param ggfsCtx Context.
-     */
-    GridGgfsIpcHandler(GridGgfsContext ggfsCtx) {
-        assert ggfsCtx != null;
-
-        ctx = ggfsCtx.kernalContext();
-        ggfs = ggfsCtx.ggfs();
-
-        // Keep buffer size multiple of block size so no extra byte array copies is performed.
-        bufSize = ggfsCtx.configuration().getBlockSize() * 2;
-
-        log = ctx.log(GridGgfsIpcHandler.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop() throws IgniteCheckedException {
-        stopping = true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onClosed(GridGgfsClientSession ses) {
-        Iterator<Closeable> it = ses.registeredResources();
-
-        while (it.hasNext()) {
-            Closeable stream = it.next();
-
-            try {
-                stream.close();
-            }
-            catch (IOException e) {
-                U.warn(log, "Failed to close opened stream on client close event (will continue) [ses=" + ses +
-                    ", stream=" + stream + ']', e);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<GridGgfsMessage> handleAsync(final GridGgfsClientSession ses,
-        final GridGgfsMessage msg, DataInput in) {
-        try {
-            // Even if will be closed right after this call, response write error will be ignored.
-            if (stopping)
-                return null;
-
-            final GridGgfsIpcCommand cmd = msg.command();
-
-            IgniteInternalFuture<GridGgfsMessage> fut;
-
-            switch (cmd) {
-                // Execute not-blocking command synchronously in worker thread.
-                case WRITE_BLOCK:
-                case MAKE_DIRECTORIES:
-                case LIST_FILES:
-                case LIST_PATHS: {
-                    GridGgfsMessage res = execute(ses, cmd, msg, in);
-
-                    fut = res == null ? null : new GridFinishedFuture<>(ctx, res);
-
-                    break;
-                }
-
-                // Execute command asynchronously in user's pool.
-                default: {
-                    fut = ctx.closure().callLocalSafe(new GridPlainCallable<GridGgfsMessage>() {
-                        @Override public GridGgfsMessage call() throws Exception {
-                            // No need to pass data input for non-write-block commands.
-                            return execute(ses, cmd, msg, null);
-                        }
-                    }, GridClosurePolicy.GGFS_POOL);
-                }
-            }
-
-            // Pack result object into response format.
-            return fut;
-        }
-        catch (Exception e) {
-            return new GridFinishedFuture<>(ctx, e);
-        }
-    }
-
-    /**
-     * Execute GGFS command.
-     *
-     * @param ses Client connection session.
-     * @param cmd Command to execute.
-     * @param msg Message to process.
-     * @param in Data input in case of block write command.
-     * @return Command execution result.
-     * @throws Exception If failed.
-     */
-    private GridGgfsMessage execute(GridGgfsClientSession ses, GridGgfsIpcCommand cmd, GridGgfsMessage msg,
-        @Nullable DataInput in)
-        throws Exception {
-        switch (cmd) {
-            case HANDSHAKE:
-                return processHandshakeRequest((GridGgfsHandshakeRequest)msg);
-
-            case STATUS:
-                return processStatusRequest();
-
-            case EXISTS:
-            case INFO:
-            case PATH_SUMMARY:
-            case UPDATE:
-            case RENAME:
-            case DELETE:
-            case MAKE_DIRECTORIES:
-            case LIST_PATHS:
-            case LIST_FILES:
-            case SET_TIMES:
-            case AFFINITY:
-            case OPEN_READ:
-            case OPEN_CREATE:
-            case OPEN_APPEND:
-                return processPathControlRequest(ses, cmd, msg);
-
-            case CLOSE:
-            case READ_BLOCK:
-            case WRITE_BLOCK:
-                return processStreamControlRequest(ses, cmd, msg, in);
-
-            default:
-                throw new IgniteCheckedException("Unsupported IPC command: " + cmd);
-        }
-    }
-
-    /**
-     * Processes handshake request.
-     *
-     * @param req Handshake request.
-     * @return Response message.
-     * @throws IgniteCheckedException In case of handshake failure.
-     */
-    private GridGgfsMessage processHandshakeRequest(GridGgfsHandshakeRequest req) throws IgniteCheckedException {
-        if (!F.eq(ctx.gridName(), req.gridName()))
-            throw new IgniteCheckedException("Failed to perform handshake because actual Grid name differs from expected " +
-                "[expected=" + req.gridName() + ", actual=" + ctx.gridName() + ']');
-
-        if (!F.eq(ggfs.name(), req.ggfsName()))
-            throw new IgniteCheckedException("Failed to perform handshake because actual GGFS name differs from expected " +
-                "[expected=" + req.ggfsName() + ", actual=" + ggfs.name() + ']');
-
-        GridGgfsControlResponse res = new GridGgfsControlResponse();
-
-        ggfs.clientLogDirectory(req.logDirectory());
-
-        GridGgfsHandshakeResponse handshake = new GridGgfsHandshakeResponse(ggfs.name(), ggfs.proxyPaths(),
-            ggfs.groupBlockSize(), ggfs.globalSampling());
-
-        res.handshake(handshake);
-
-        return res;
-    }
-
-    /**
-     * Processes status request.
-     *
-     * @return Status response.
-     * @throws IgniteCheckedException If failed.
-     */
-    private GridGgfsMessage processStatusRequest() throws IgniteCheckedException {
-        IgfsStatus status = ggfs.globalSpace();
-
-        GridGgfsControlResponse res = new GridGgfsControlResponse();
-
-        res.status(status);
-
-        return res;
-    }
-
-    /**
-     * Processes path control request.
-     *
-     * @param ses Session.
-     * @param cmd Command.
-     * @param msg Message.
-     * @return Response message.
-     * @throws IgniteCheckedException If failed.
-     */
-    private GridGgfsMessage processPathControlRequest(GridGgfsClientSession ses, GridGgfsIpcCommand cmd,
-        GridGgfsMessage msg) throws IgniteCheckedException {
-        GridGgfsPathControlRequest req = (GridGgfsPathControlRequest)msg;
-
-        if (log.isDebugEnabled())
-            log.debug("Processing path control request [ggfsName=" + ggfs.name() + ", req=" + req + ']');
-
-        GridGgfsControlResponse res = new GridGgfsControlResponse();
-
-        try {
-            switch (cmd) {
-                case EXISTS:
-                    res.response(ggfs.exists(req.path()));
-
-                    break;
-
-                case INFO:
-                    res.response(ggfs.info(req.path()));
-
-                    break;
-
-                case PATH_SUMMARY:
-                    res.response(ggfs.summary(req.path()));
-
-                    break;
-
-                case UPDATE:
-                    res.response(ggfs.update(req.path(), req.properties()));
-
-                    break;
-
-                case RENAME:
-                    ggfs.rename(req.path(), req.destinationPath());
-
-                    res.response(true);
-
-                    break;
-
-                case DELETE:
-                    res.response(ggfs.delete(req.path(), req.flag()));
-
-                    break;
-
-                case MAKE_DIRECTORIES:
-                    ggfs.mkdirs(req.path(), req.properties());
-
-                    res.response(true);
-
-                    break;
-
-                case LIST_PATHS:
-                    res.paths(ggfs.listPaths(req.path()));
-
-                    break;
-
-                case LIST_FILES:
-                    res.files(ggfs.listFiles(req.path()));
-
-                    break;
-
-                case SET_TIMES:
-                    ggfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
-
-                    res.response(true);
-
-                    break;
-
-                case AFFINITY:
-                    res.locations(ggfs.affinity(req.path(), req.start(), req.length()));
-
-                    break;
-
-                case OPEN_READ: {
-                    GridGgfsInputStreamAdapter ggfsIn = !req.flag() ? ggfs.open(req.path(), bufSize) :
-                        ggfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
-
-                    long streamId = registerResource(ses, ggfsIn);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Opened GGFS input stream for file read [ggfsName=" + ggfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
-
-                    GridGgfsFileInfo info = new GridGgfsFileInfo(ggfsIn.fileInfo(), null,
-                        ggfsIn.fileInfo().modificationTime());
-
-                    res.response(new GridGgfsInputStreamDescriptor(streamId, info.length()));
-
-                    break;
-                }
-
-                case OPEN_CREATE: {
-                    long streamId = registerResource(ses, ggfs.create(
-                        req.path(),       // Path.
-                        bufSize,          // Buffer size.
-                        req.flag(),       // Overwrite if exists.
-                        affinityKey(req), // Affinity key based on replication factor.
-                        req.replication(),// Replication factor.
-                        req.blockSize(),  // Block size.
-                        req.properties()  // File properties.
-                    ));
-
-                    if (log.isDebugEnabled())
-                        log.debug("Opened GGFS output stream for file create [ggfsName=" + ggfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
-
-                    res.response(streamId);
-
-                    break;
-                }
-
-                case OPEN_APPEND: {
-                    long streamId = registerResource(ses, ggfs.append(
-                        req.path(),        // Path.
-                        bufSize,           // Buffer size.
-                        req.flag(),        // Create if absent.
-                        req.properties()   // File properties.
-                    ));
-
-                    if (log.isDebugEnabled())
-                        log.debug("Opened GGFS output stream for file append [ggfsName=" + ggfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
-
-                    res.response(streamId);
-
-                    break;
-                }
-
-                default:
-                    assert false : "Unhandled path control request command: " + cmd;
-
-                    break;
-            }
-        }
-        catch (IgniteException e) {
-            throw new IgniteCheckedException(e);
-        }
-
-        if (log.isDebugEnabled())
-            log.debug("Finished processing path control request [ggfsName=" + ggfs.name() + ", req=" + req +
-                ", res=" + res + ']');
-
-        return res;
-    }
-
-    /**
-     * Processes stream control request.
-     *
-     * @param ses Session.
-     * @param cmd Command.
-     * @param msg Message.
-     * @param in Data input to read.
-     * @return Response message if needed.
-     * @throws IgniteCheckedException If failed.
-     * @throws IOException If failed.
-     */
-    private GridGgfsMessage processStreamControlRequest(GridGgfsClientSession ses, GridGgfsIpcCommand cmd,
-        GridGgfsMessage msg, DataInput in) throws IgniteCheckedException, IOException {
-        GridGgfsStreamControlRequest req = (GridGgfsStreamControlRequest)msg;
-
-        Long rsrcId = req.streamId();
-
-        GridGgfsControlResponse resp = new GridGgfsControlResponse();
-
-        switch (cmd) {
-            case CLOSE: {
-                Closeable res = resource(ses, rsrcId);
-
-                if (log.isDebugEnabled())
-                    log.debug("Requested to close resource [ggfsName=" + ggfs.name() + ", rsrcId=" + rsrcId +
-                        ", res=" + res + ']');
-
-                if (res == null)
-                    throw new IgniteCheckedException("Resource to close not found: " + rsrcId);
-
-                try {
-                    res.close();
-                }
-                catch (IOException e) {
-                    // Unwrap OutOfSpaceException, if has one.
-                    IgniteFsOutOfSpaceException space = X.cause(e, IgniteFsOutOfSpaceException.class);
-
-                    if (space != null)
-                        throw space;
-
-                    throw e;
-                }
-
-                boolean success = ses.unregisterResource(rsrcId, res);
-
-                assert success : "Failed to unregister resource [ggfsName=" + ggfs.name() + ", rsrcId=" + rsrcId +
-                    ", res=" + res + ']';
-
-                if (log.isDebugEnabled())
-                    log.debug("Closed GGFS stream [ggfsName=" + ggfs.name() + ", streamId=" + rsrcId +
-                        ", ses=" + ses + ']');
-
-                resp.response(true);
-
-                break;
-            }
-
-            case READ_BLOCK: {
-                long pos = req.position();
-                int size = req.length();
-
-                GridGgfsInputStreamAdapter ggfsIn = (GridGgfsInputStreamAdapter)resource(ses, rsrcId);
-
-                if (ggfsIn == null)
-                    throw new IgniteCheckedException("Input stream not found (already closed?): " + rsrcId);
-
-                byte[][] chunks = ggfsIn.readChunks(pos, size);
-
-                resp.response(chunks);
-
-                // Calculate number of read bytes.
-                // len = len(first) + (n - 2) * len(block) + len(last).
-                int len = 0;
-
-                if (chunks.length > 0)
-                    len += chunks[0].length;
-
-                if (chunks.length > 1)
-                    len += chunks[chunks.length - 1].length;
-
-                if (chunks.length > 2)
-                    len += chunks[1].length * (chunks.length - 2);
-
-                resp.length(len);
-
-                break;
-            }
-
-            case WRITE_BLOCK: {
-                assert rsrcId != null : "Missing stream ID";
-
-                IgniteFsOutputStream out = (IgniteFsOutputStream)resource(ses, rsrcId);
-
-                if (out == null)
-                    throw new IgniteCheckedException("Output stream not found (already closed?): " + rsrcId);
-
-                int writeLen = req.length();
-
-                try {
-                    out.transferFrom(in, writeLen);
-
-                    if (errWrite)
-                        throw new IOException("Failed to write data to server (test).");
-
-                    // No response needed.
-                    return null;
-                }
-                catch (IOException e) {
-                    resp.error(rsrcId, e.getMessage());
-
-                    break;
-                }
-            }
-
-            default:
-                assert false;
-
-                break;
-        }
-
-        return resp;
-    }
-
-    /**
-     * @param req Path control request.
-     * @return Affinity key that maps on local node by the time this method is called if replication factor
-     *      is {@code 0}, {@code null} otherwise.
-     */
-    @Nullable private IgniteUuid affinityKey(GridGgfsPathControlRequest req) {
-        // Do not generate affinity key for replicated or near-only cache.
-        if (!req.colocate()) {
-            if (log.isDebugEnabled())
-                log.debug("Will not generate affinity key for path control request [ggfsName=" + ggfs.name() +
-                    ", req=" + req + ']');
-
-            return null;
-        }
-
-        IgniteUuid key = ggfs.nextAffinityKey();
-
-        if (log.isDebugEnabled())
-            log.debug("Generated affinity key for path control request [ggfsName=" + ggfs.name() +
-                ", req=" + req + ", key=" + key + ']');
-
-        return key;
-    }
-
-    /**
-     * Registers closeable resource within client session.
-     *
-     * @param ses IPC session.
-     * @param rsrc Resource to register.
-     * @return Registration resource ID.
-     */
-    private long registerResource(GridGgfsClientSession ses, Closeable rsrc) {
-        long rsrcId = rsrcIdGen.getAndIncrement();
-
-        boolean registered = ses.registerResource(rsrcId, rsrc);
-
-        assert registered : "Failed to register resource (duplicate id?): " + rsrcId;
-
-        return rsrcId;
-    }
-
-    /**
-     * Gets resource by resource ID from client session.
-     *
-     * @param ses Session to get resource from.
-     * @param rsrcId Resource ID.
-     * @return Registered resource or {@code null} if not found.
-     */
-    @Nullable private Closeable resource(GridGgfsClientSession ses, Long rsrcId) {
-        return ses.resource(rsrcId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsJobImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsJobImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsJobImpl.java
deleted file mode 100644
index 0276457..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsJobImpl.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.ignitefs.*;
-import org.apache.ignite.ignitefs.mapreduce.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.resources.*;
-
-import java.io.*;
-
-/**
- * GGFS job implementation.
- */
-public class GridGgfsJobImpl implements ComputeJob, GridInternalWrapper<IgniteFsJob> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** GGFS job. */
-    private IgniteFsJob job;
-
-    /** GGFS name. */
-    private String ggfsName;
-
-    /** GGFS path. */
-    private IgniteFsPath path;
-
-    /** Start. */
-    private long start;
-
-    /** Length. */
-    private long len;
-
-    /** Split resolver. */
-    private IgniteFsRecordResolver rslvr;
-
-    /** Injected grid. */
-    @IgniteInstanceResource
-    private Ignite ignite;
-
-    /** Injected logger. */
-    @LoggerResource
-    private IgniteLogger log;
-
-    /**
-     * @param job GGFS job.
-     * @param ggfsName GGFS name.
-     * @param path Split path.
-     * @param start Split start offset.
-     * @param len Split length.
-     * @param rslvr GGFS split resolver.
-     */
-    public GridGgfsJobImpl(IgniteFsJob job, String ggfsName, IgniteFsPath path, long start, long len,
-        IgniteFsRecordResolver rslvr) {
-        this.job = job;
-        this.ggfsName = ggfsName;
-        this.path = path;
-        this.start = start;
-        this.len = len;
-        this.rslvr = rslvr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Object execute() {
-        IgniteFs fs = ignite.fileSystem(ggfsName);
-
-        try (IgniteFsInputStream in = fs.open(path)) {
-            IgniteFsFileRange split = new IgniteFsFileRange(path, start, len);
-
-            if (rslvr != null) {
-                split = rslvr.resolveRecords(fs, in, split);
-
-                if (split == null) {
-                    log.warning("No data found for split on local node after resolver is applied " +
-                        "[ggfsName=" + ggfsName + ", path=" + path + ", start=" + start + ", len=" + len + ']');
-
-                    return null;
-                }
-            }
-
-            in.seek(split.start());
-
-            return job.execute(fs, new IgniteFsFileRange(path, split.start(), split.length()), in);
-        }
-        catch (IOException e) {
-            throw new IgniteException("Failed to execute GGFS job for file split [ggfsName=" + ggfsName +
-                ", path=" + path + ", start=" + start + ", len=" + len + ']', e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cancel() {
-        job.cancel();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFsJob userObject() {
-        return job;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsListingEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsListingEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsListingEntry.java
deleted file mode 100644
index ef328ee..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsListingEntry.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Directory listing entry.
- */
-public class GridGgfsListingEntry implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** File id. */
-    private IgniteUuid fileId;
-
-    /** File affinity key. */
-    private IgniteUuid affKey;
-
-    /** Positive block size if file, 0 if directory. */
-    private int blockSize;
-
-    /** File length. */
-    private long len;
-
-    /** Last access time. */
-    private long accessTime;
-
-    /** Last modification time. */
-    private long modificationTime;
-
-    /** File properties. */
-    private Map<String, String> props;
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public GridGgfsListingEntry() {
-        // No-op.
-    }
-
-    /**
-     * @param fileInfo File info to construct listing entry from.
-     */
-    public GridGgfsListingEntry(GridGgfsFileInfo fileInfo) {
-        fileId = fileInfo.id();
-        affKey = fileInfo.affinityKey();
-
-        if (fileInfo.isFile()) {
-            blockSize = fileInfo.blockSize();
-            len = fileInfo.length();
-        }
-
-        props = fileInfo.properties();
-        accessTime = fileInfo.accessTime();
-        modificationTime = fileInfo.modificationTime();
-    }
-
-    /**
-     * Creates listing entry with updated length.
-     *
-     * @param entry Entry.
-     * @param len New length.
-     */
-    public GridGgfsListingEntry(GridGgfsListingEntry entry, long len, long accessTime, long modificationTime) {
-        fileId = entry.fileId;
-        affKey = entry.affKey;
-        blockSize = entry.blockSize;
-        props = entry.props;
-        this.accessTime = accessTime;
-        this.modificationTime = modificationTime;
-
-        this.len = len;
-    }
-
-    /**
-     * @return Entry file ID.
-     */
-    public IgniteUuid fileId() {
-        return fileId;
-    }
-
-    /**
-     * @return File affinity key, if specified.
-     */
-    public IgniteUuid affinityKey() {
-        return affKey;
-    }
-
-    /**
-     * @return {@code True} if entry represents file.
-     */
-    public boolean isFile() {
-        return blockSize > 0;
-    }
-
-    /**
-     * @return {@code True} if entry represents directory.
-     */
-    public boolean isDirectory() {
-        return blockSize == 0;
-    }
-
-    /**
-     * @return Block size.
-     */
-    public int blockSize() {
-        return blockSize;
-    }
-
-    /**
-     * @return Length.
-     */
-    public long length() {
-        return len;
-    }
-
-    /**
-     * @return Last access time.
-     */
-    public long accessTime() {
-        return accessTime;
-    }
-
-    /**
-     * @return Last modification time.
-     */
-    public long modificationTime() {
-        return modificationTime;
-    }
-
-    /**
-     * @return Properties map.
-     */
-    public Map<String, String> properties() {
-        return props;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeGridUuid(out, fileId);
-        out.writeInt(blockSize);
-        out.writeLong(len);
-        U.writeStringMap(out, props);
-        out.writeLong(accessTime);
-        out.writeLong(modificationTime);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        fileId = U.readGridUuid(in);
-        blockSize = in.readInt();
-        len = in.readLong();
-        props = U.readStringMap(in);
-        accessTime = in.readLong();
-        modificationTime = in.readLong();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o) return true;
-        if (!(o instanceof GridGgfsListingEntry)) return false;
-
-        GridGgfsListingEntry that = (GridGgfsListingEntry)o;
-
-        return fileId.equals(that.fileId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return fileId.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridGgfsListingEntry.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsLocalMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsLocalMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsLocalMetrics.java
deleted file mode 100644
index 0cfd645..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsLocalMetrics.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.lang.*;
-import org.jdk8.backport.*;
-
-/**
- * Value object holding all local GGFS metrics which cannot be determined using file system traversal.
- */
-public class GridGgfsLocalMetrics {
-    /** Block reads. First value - total reads, second value - reads delegated to the secondary file system. */
-    private volatile IgniteBiTuple<LongAdder, LongAdder> blocksRead;
-
-    /** Block writes. First value - total writes, second value - writes delegated to the secondary file system. */
-    private volatile IgniteBiTuple<LongAdder, LongAdder> blocksWritten;
-
-    /** Byte reads. First value - total bytes read, second value - consumed time. */
-    private volatile IgniteBiTuple<LongAdder, LongAdder> bytesRead;
-
-    /** Byte writes. First value - total bytes written, second value - consumed time. */
-    private volatile IgniteBiTuple<LongAdder, LongAdder> bytesWritten;
-
-    /** Number of files opened for read. */
-    private final LongAdder filesOpenedForRead = new LongAdder();
-
-    /** Number of files opened for write. */
-    private final LongAdder filesOpenedForWrite = new LongAdder();
-
-    /**
-     * Constructor.
-     */
-    GridGgfsLocalMetrics() {
-        reset();
-    }
-
-    /**
-     * @return Read bytes.
-     */
-    long readBytes() {
-        return bytesRead.get1().longValue();
-    }
-
-    /**
-     * @return Read bytes time.
-     */
-    long readBytesTime() {
-        return bytesRead.get2().longValue();
-    }
-
-    /**
-     * Adds given numbers to read bytes and read time.
-     *
-     * @param readBytes Number of bytes read.
-     * @param readTime Read time.
-     */
-    void addReadBytesTime(long readBytes, long readTime) {
-        IgniteBiTuple<LongAdder, LongAdder> bytesRead0 = bytesRead;
-
-        bytesRead0.get1().add(readBytes);
-        bytesRead0.get2().add(readTime);
-    }
-
-    /**
-     * @return Written bytes.
-     */
-    long writeBytes() {
-        return bytesWritten.get1().longValue();
-    }
-
-    /**
-     * @return Write bytes time.
-     */
-    long writeBytesTime() {
-        return bytesWritten.get2().longValue();
-    }
-
-    /**
-     * Adds given numbers to written bytes and write time.
-     *
-     * @param writtenBytes Number of bytes written.
-     * @param writeTime Write time.
-     */
-    void addWrittenBytesTime(long writtenBytes, long writeTime) {
-        IgniteBiTuple<LongAdder, LongAdder> bytesWritten0 = bytesWritten;
-
-        bytesWritten0.get1().add(writtenBytes);
-        bytesWritten0.get2().add(writeTime);
-    }
-
-    /**
-     * @return Read blocks.
-     */
-    long readBlocks() {
-        return blocksRead.get1().longValue();
-    }
-
-    /**
-     * @return Written blocks to secondary file system.
-     */
-    long readBlocksSecondary() {
-        return blocksRead.get2().longValue();
-    }
-
-    /**
-     * Adds given numbers to read blocks counters.
-     *
-     * @param total Total number of blocks read.
-     * @param secondary Number of blocks read form secondary FS.
-     */
-    void addReadBlocks(int total, int secondary) {
-        IgniteBiTuple<LongAdder, LongAdder> blocksRead0 = blocksRead;
-
-        blocksRead0.get1().add(total);
-        blocksRead0.get2().add(secondary);
-    }
-
-    /**
-     * @return Written blocks.
-     */
-    long writeBlocks() {
-        return blocksWritten.get1().longValue();
-    }
-
-    /**
-     * @return Written blocks to secondary file system.
-     */
-    long writeBlocksSecondary() {
-        return blocksWritten.get2().longValue();
-    }
-
-    /**
-     * Adds given numbers to write blocks counters.
-     *
-     * @param total Total number of block written.
-     * @param secondary Number of blocks written to secondary FS.
-     */
-    void addWriteBlocks(int total, int secondary) {
-        IgniteBiTuple<LongAdder, LongAdder> blocksWritten0 = blocksWritten;
-
-        blocksWritten0.get1().add(total);
-        blocksWritten0.get2().add(secondary);
-    }
-
-    /**
-     * Increment files opened for read.
-     */
-    void incrementFilesOpenedForRead() {
-        filesOpenedForRead.increment();
-    }
-
-    /**
-     * Decrement files opened for read.
-     */
-    void decrementFilesOpenedForRead() {
-        filesOpenedForRead.decrement();
-    }
-
-    /**
-     * @return Files opened for read.
-     */
-    int filesOpenedForRead() {
-        return filesOpenedForRead.intValue();
-    }
-
-    /**
-     * Increment files opened for write.
-     */
-    void incrementFilesOpenedForWrite() {
-        filesOpenedForWrite.increment();
-    }
-
-    /**
-     * Decrement files opened for write.
-     */
-    void decrementFilesOpenedForWrite() {
-        filesOpenedForWrite.decrement();
-    }
-
-    /**
-     * @return Files opened for write.
-     */
-    int filesOpenedForWrite() {
-        return filesOpenedForWrite.intValue();
-    }
-
-    /**
-     * Reset summary  counters.
-     */
-    void reset() {
-        blocksRead = F.t(new LongAdder(), new LongAdder());
-        blocksWritten = F.t(new LongAdder(), new LongAdder());
-        bytesRead = F.t(new LongAdder(), new LongAdder());
-        bytesWritten = F.t(new LongAdder(), new LongAdder());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1be8b773/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsManager.java
deleted file mode 100644
index ac6de56..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsManager.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.fs;
-
-import org.apache.ignite.*;
-
-import java.util.concurrent.atomic.*;
-
-/**
- * Abstract class for GGFS managers.
- */
-public abstract class GridGgfsManager {
-    /** GGFS context. */
-    protected GridGgfsContext ggfsCtx;
-
-    /** Logger. */
-    protected IgniteLogger log;
-
-    /** Starting flag. */
-    private AtomicBoolean starting = new AtomicBoolean();
-
-    /**
-     * Called when GGFS processor is started.
-     *
-     * @param ggfsCtx GGFS context.
-     */
-    public void start(GridGgfsContext ggfsCtx) throws IgniteCheckedException {
-        if (!starting.compareAndSet(false, true))
-            assert false : "Method start is called more than once for manager: " + this;
-
-        assert ggfsCtx != null;
-
-        this.ggfsCtx = ggfsCtx;
-
-        log = ggfsCtx.kernalContext().log(getClass());
-
-        start0();
-
-        if (log != null && log.isDebugEnabled())
-            log.debug(startInfo());
-    }
-
-    /**
-     * Stops manager.
-     *
-     * @param cancel Cancel flag.
-     */
-    public final void stop(boolean cancel) {
-        if (!starting.get())
-            // Ignoring attempt to stop manager that has never been started.
-            return;
-
-        stop0(cancel);
-
-        if (log != null && log.isDebugEnabled())
-            log.debug(stopInfo());
-    }
-
-    /**
-     * @throws IgniteCheckedException If failed.
-     */
-    public final void onKernalStart() throws IgniteCheckedException {
-        onKernalStart0();
-
-        if (log != null && log.isDebugEnabled())
-            log.debug(kernalStartInfo());
-    }
-
-    /**
-     * @param cancel Cancel flag.
-     */
-    public final void onKernalStop(boolean cancel) {
-        if (!starting.get())
-            // Ignoring attempt to stop manager that has never been started.
-            return;
-
-        onKernalStop0(cancel);
-
-        if (log != null && log.isDebugEnabled())
-            log.debug(kernalStopInfo());
-    }
-
-    /**
-     * Start manager implementation.
-     */
-    protected void start0() throws IgniteCheckedException {
-        // No-op by default.
-    }
-
-    /**
-     * Stop manager implementation.
-     *
-     * @param cancel Cancel flag.
-     */
-    protected void stop0(boolean cancel) {
-        // No-op by default.
-    }
-
-    /**
-     * @throws IgniteCheckedException If failed.
-     */
-    protected void onKernalStart0() throws IgniteCheckedException {
-        // No-op.
-    }
-
-    /**
-     *
-     */
-    protected void onKernalStop0(boolean cancel) {
-        // No-op.
-    }
-
-    /**
-     * @return Start info.
-     */
-    protected String startInfo() {
-        return "Cache manager started: " + getClass().getSimpleName();
-    }
-
-    /**
-     * @return Stop info.
-     */
-    protected String stopInfo() {
-        return "Cache manager stopped: " + getClass().getSimpleName();
-    }
-
-    /**
-     * @return Start info.
-     */
-    protected String kernalStartInfo() {
-        return "Cache manager received onKernalStart() callback: " + getClass().getSimpleName();
-    }
-
-    /**
-     * @return Stop info.
-     */
-    protected String kernalStopInfo() {
-        return "Cache manager received onKernalStop() callback: " + getClass().getSimpleName();
-    }
-}


Mime
View raw message