ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [14/62] [abbrv] incubator-ignite git commit: # IGNITE-226: WIP.
Date Fri, 13 Feb 2015 16:13:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsAdapter.java
new file mode 100644
index 0000000..f37688a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsAdapter.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * GGFS metrics adapter.
+ */
+public class IgfsMetricsAdapter implements IgfsMetrics, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Used space on local node. */
+    private long locSpaceSize;
+
+    /** Maximum space. */
+    private long maxSpaceSize;
+
+    /** Secondary file system used space. */
+    private long secondarySpaceSize;
+
+    /** Number of directories. */
+    private int dirsCnt;
+
+    /** Number of files. */
+    private int filesCnt;
+
+    /** Number of files opened for read. */
+    private int filesOpenedForRead;
+
+    /** Number of files opened for write. */
+    private int filesOpenedForWrite;
+
+    /** Total blocks read. */
+    private long blocksReadTotal;
+
+    /** Total blocks remote read. */
+    private long blocksReadRmt;
+
+    /** Total blocks write. */
+    private long blocksWrittenTotal;
+
+    /** Total blocks write remote. */
+    private long blocksWrittenRmt;
+
+    /** Total bytes read. */
+    private long bytesRead;
+
+    /** Total bytes read time. */
+    private long bytesReadTime;
+
+    /** Total bytes write. */
+    private long bytesWritten;
+
+    /** Total bytes write time. */
+    private long bytesWriteTime;
+
+    /**
+     * {@link Externalizable} support.
+     */
+    public IgfsMetricsAdapter() {
+        // No-op.
+    }
+
+    /**
+     * @param locSpaceSize Used space on local node.
+     * @param maxSpaceSize Maximum space size.
+     * @param secondarySpaceSize Secondary space size.
+     * @param dirsCnt Number of directories.
+     * @param filesCnt Number of files.
+     * @param filesOpenedForRead Number of files opened for read.
+     * @param filesOpenedForWrite Number of files opened for write.
+     * @param blocksReadTotal Total blocks read.
+     * @param blocksReadRmt Total blocks read remotely.
+     * @param blocksWrittenTotal Total blocks written.
+     * @param blocksWrittenRmt Total blocks written remotely.
+     * @param bytesRead Total bytes read.
+     * @param bytesReadTime Total bytes read time.
+     * @param bytesWritten Total bytes written.
+     * @param bytesWriteTime Total bytes write time.
+     */
+    public IgfsMetricsAdapter(long locSpaceSize, long maxSpaceSize, long secondarySpaceSize, int dirsCnt,
+        int filesCnt, int filesOpenedForRead, int filesOpenedForWrite, long blocksReadTotal, long blocksReadRmt,
+        long blocksWrittenTotal, long blocksWrittenRmt, long bytesRead, long bytesReadTime, long bytesWritten,
+        long bytesWriteTime) {
+        this.locSpaceSize = locSpaceSize;
+        this.maxSpaceSize = maxSpaceSize;
+        this.secondarySpaceSize = secondarySpaceSize;
+        this.dirsCnt = dirsCnt;
+        this.filesCnt = filesCnt;
+        this.filesOpenedForRead = filesOpenedForRead;
+        this.filesOpenedForWrite = filesOpenedForWrite;
+        this.blocksReadTotal = blocksReadTotal;
+        this.blocksReadRmt = blocksReadRmt;
+        this.blocksWrittenTotal = blocksWrittenTotal;
+        this.blocksWrittenRmt = blocksWrittenRmt;
+        this.bytesRead = bytesRead;
+        this.bytesReadTime = bytesReadTime;
+        this.bytesWritten = bytesWritten;
+        this.bytesWriteTime = bytesWriteTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long localSpaceSize() {
+        return locSpaceSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long maxSpaceSize() {
+        return maxSpaceSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long secondarySpaceSize() {
+        return secondarySpaceSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int directoriesCount() {
+        return dirsCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int filesCount() {
+        return filesCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int filesOpenedForRead() {
+        return filesOpenedForRead;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int filesOpenedForWrite() {
+        return filesOpenedForWrite;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long blocksReadTotal() {
+        return blocksReadTotal;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long blocksReadRemote() {
+        return blocksReadRmt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long blocksWrittenTotal() {
+        return blocksWrittenTotal;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long blocksWrittenRemote() {
+        return blocksWrittenRmt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long bytesRead() {
+        return bytesRead;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long bytesReadTime() {
+        return bytesReadTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long bytesWritten() {
+        return bytesWritten;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long bytesWriteTime() {
+        return bytesWriteTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeLong(locSpaceSize);
+        out.writeLong(maxSpaceSize);
+        out.writeLong(secondarySpaceSize);
+        out.writeInt(dirsCnt);
+        out.writeInt(filesCnt);
+        out.writeInt(filesOpenedForRead);
+        out.writeInt(filesOpenedForWrite);
+        out.writeLong(blocksReadTotal);
+        out.writeLong(blocksReadRmt);
+        out.writeLong(blocksWrittenTotal);
+        out.writeLong(blocksWrittenRmt);
+        out.writeLong(bytesRead);
+        out.writeLong(bytesReadTime);
+        out.writeLong(bytesWritten);
+        out.writeLong(bytesWriteTime);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException {
+        locSpaceSize = in.readLong();
+        maxSpaceSize = in.readLong();
+        secondarySpaceSize = in.readLong();
+        dirsCnt = in.readInt();
+        filesCnt = in.readInt();
+        filesOpenedForRead = in.readInt();
+        filesOpenedForWrite = in.readInt();
+        blocksReadTotal = in.readLong();
+        blocksReadRmt = in.readLong();
+        blocksWrittenTotal = in.readLong();
+        blocksWrittenRmt = in.readLong();
+        bytesRead = in.readLong();
+        bytesReadTime = in.readLong();
+        bytesWritten = in.readLong();
+        bytesWriteTime = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsMetricsAdapter.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsModeResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsModeResolver.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsModeResolver.java
new file mode 100644
index 0000000..94b5627
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsModeResolver.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class IgfsModeResolver {
+    /** Maximum size of map with cached path modes. */
+    private static final int MAX_PATH_CACHE = 1000;
+
+    /** Default mode. */
+    private final IgfsMode dfltMode;
+
+    /** Modes for particular paths. Ordered from longest to shortest. */
+    private ArrayList<T2<IgfsPath, IgfsMode>> modes;
+
+    /** Cached modes per path. */
+    private Map<IgfsPath, IgfsMode> modesCache;
+
+    /** Cached children modes per path. */
+    private Map<IgfsPath, Set<IgfsMode>> childrenModesCache;
+
+    /**
+     * @param dfltMode Default GGFS mode.
+     * @param modes List of configured modes.
+     */
+    public IgfsModeResolver(IgfsMode dfltMode, @Nullable List<T2<IgfsPath, IgfsMode>> modes) {
+        assert dfltMode != null;
+
+        this.dfltMode = dfltMode;
+
+        if (modes != null) {
+            ArrayList<T2<IgfsPath, IgfsMode>> modes0 = new ArrayList<>(modes);
+
+            // Sort paths, longest first.
+            Collections.sort(modes0, new Comparator<Map.Entry<IgfsPath, IgfsMode>>() {
+                @Override public int compare(Map.Entry<IgfsPath, IgfsMode> o1,
+                    Map.Entry<IgfsPath, IgfsMode> o2) {
+                    return o2.getKey().components().size() - o1.getKey().components().size();
+                }
+            });
+
+            this.modes = modes0;
+
+            modesCache = new GridBoundedConcurrentLinkedHashMap<>(MAX_PATH_CACHE);
+            childrenModesCache = new GridBoundedConcurrentLinkedHashMap<>(MAX_PATH_CACHE);
+        }
+    }
+
+    /**
+     * Resolves GGFS mode for the given path.
+     *
+     * @param path GGFS path.
+     * @return GGFS mode.
+     */
+    public IgfsMode resolveMode(IgfsPath path) {
+        assert path != null;
+
+        if (modes == null)
+            return dfltMode;
+        else {
+            IgfsMode mode = modesCache.get(path);
+
+            if (mode == null) {
+                for (T2<IgfsPath, IgfsMode> entry : modes) {
+                    if (startsWith(path, entry.getKey())) {
+                        // As modes ordered from most specific to least specific first mode found is ours.
+                        mode = entry.getValue();
+
+                        break;
+                    }
+                }
+
+                if (mode == null)
+                    mode = dfltMode;
+
+                modesCache.put(path, mode);
+            }
+
+            return mode;
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @return Set of all modes that children paths could have.
+     */
+    public Set<IgfsMode> resolveChildrenModes(IgfsPath path) {
+        assert path != null;
+
+        if (modes == null)
+            return Collections.singleton(dfltMode);
+        else {
+            Set<IgfsMode> children = childrenModesCache.get(path);
+
+            if (children == null) {
+                children = new HashSet<>(IgfsMode.values().length, 1.0f);
+
+                IgfsMode pathDefault = dfltMode;
+
+                for (T2<IgfsPath, IgfsMode> child : modes) {
+                    if (startsWith(path, child.getKey())) {
+                        pathDefault = child.getValue();
+
+                        break;
+                    }
+                    else if (startsWith(child.getKey(), path))
+                        children.add(child.getValue());
+                }
+
+                children.add(pathDefault);
+
+                childrenModesCache.put(path, children);
+            }
+
+            return children;
+        }
+    }
+
+    /**
+     * @return Unmodifiable copy of properly ordered modes prefixes
+     *  or {@code null} if no modes set.
+     */
+    @Nullable public List<T2<IgfsPath, IgfsMode>> modesOrdered() {
+        return modes != null ? Collections.unmodifiableList(modes) : null;
+    }
+
+    /**
+     * Check if path starts with prefix.
+     *
+     * @param path Path.
+     * @param prefix Prefix.
+     * @return {@code true} if path starts with prefix, {@code false} if not.
+     */
+    private static boolean startsWith(IgfsPath path, IgfsPath prefix) {
+        List<String> p1Comps = path.components();
+        List<String> p2Comps = prefix.components();
+
+        if (p2Comps.size() > p1Comps.size())
+            return false;
+
+        for (int i = 0; i < p1Comps.size(); i++) {
+            if (i >= p2Comps.size() || p2Comps.get(i) == null)
+                // All prefix components already matched.
+                return true;
+
+            if (!p1Comps.get(i).equals(p2Comps.get(i)))
+                return false;
+        }
+
+        // Path and prefix components had same length and all of them matched.
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsNoopHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsNoopHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsNoopHelper.java
new file mode 100644
index 0000000..7f0a1ca
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsNoopHelper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+
+/**
+ * No-op utils processor adapter.
+ */
+public class IgfsNoopHelper implements IgfsHelper {
+    /** {@inheritDoc} */
+    @Override public void preProcessCacheConfiguration(CacheConfiguration cfg) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateCacheConfiguration(CacheConfiguration cfg) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isGgfsBlockKey(Object key) {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsNoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsNoopProcessor.java
new file mode 100644
index 0000000..c14c21e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsNoopProcessor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.mapreduce.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.ipc.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Nop Ignite file system processor implementation.
+ */
+public class IgfsNoopProcessor extends IgfsProcessorAdapter {
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    public IgfsNoopProcessor(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printMemoryStats() {
+        X.println(">>>");
+        X.println(">>> GGFS processor memory stats [grid=" + ctx.gridName() + ']');
+        X.println(">>>   ggfsCacheSize: " + 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgniteFs> ggfss() {
+        return Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteFs ggfs(@Nullable String name) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IpcServerEndpoint> endpoints(@Nullable String name) {
+        return Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public ComputeJob createJob(IgfsJob job, @Nullable String ggfsName, IgfsPath path,
+        long start, long length, IgfsRecordResolver recRslv) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
new file mode 100644
index 0000000..6edc209
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Output stream to store data into grid cache with separate blocks.
+ */
+@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+abstract class IgfsOutputStreamAdapter extends IgfsOutputStream {
+    /** Path to file. */
+    protected final IgfsPath path;
+
+    /** Buffer size. */
+    private final int bufSize;
+
+    /** Flag for this stream open/closed state. */
+    private boolean closed;
+
+    /** Local buffer to store stream data as consistent block. */
+    private ByteBuffer buf;
+
+    /** Bytes written. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    protected long bytes;
+
+    /** Time consumed by write operations. */
+    protected long time;
+
+    /**
+     * Constructs file output stream.
+     *
+     * @param path Path to stored file.
+     * @param bufSize The size of the buffer to be used.
+     */
+    IgfsOutputStreamAdapter(IgfsPath path, int bufSize) {
+        assert path != null;
+        assert bufSize > 0;
+
+        this.path = path;
+        this.bufSize = bufSize;
+    }
+
+    /**
+     * Gets number of written bytes.
+     *
+     * @return Written bytes.
+     */
+    public long bytes() {
+        return bytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(int b) throws IOException {
+        checkClosed(null, 0);
+
+        long startTime = System.nanoTime();
+
+        b &= 0xFF;
+
+        if (buf == null)
+            buf = ByteBuffer.allocate(bufSize);
+
+        buf.put((byte)b);
+
+        if (buf.position() >= bufSize)
+            sendData(true); // Send data to server.
+
+        time += System.nanoTime() - startTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
+        A.notNull(b, "b");
+
+        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
+            throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off +
+                ", length=" + len + ']');
+        }
+
+        checkClosed(null, 0);
+
+        if (len == 0)
+            return; // Done.
+
+        long startTime = System.nanoTime();
+
+        if (buf == null) {
+            // Do not allocate and copy byte buffer if will send data immediately.
+            if (len >= bufSize) {
+                buf = ByteBuffer.wrap(b, off, len);
+
+                sendData(false);
+
+                return;
+            }
+
+            buf = ByteBuffer.allocate(Math.max(bufSize, len));
+        }
+
+        if (buf.remaining() < len)
+            // Expand buffer capacity, if remaining size is less then data size.
+            buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
+
+        assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
+            "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
+
+        buf.put(b, off, len);
+
+        if (buf.position() >= bufSize)
+            sendData(true); // Send data to server.
+
+        time += System.nanoTime() - startTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void transferFrom(DataInput in, int len) throws IOException {
+        checkClosed(in, len);
+
+        long startTime = System.nanoTime();
+
+        // Send all IPC data from the local buffer before streaming.
+        if (buf != null && buf.position() > 0)
+            sendData(true);
+
+        try {
+            storeDataBlocks(in, len);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException(e.getMessage(), e);
+        }
+
+        time += System.nanoTime() - startTime;
+    }
+
+    /**
+     * Flushes this output stream and forces any buffered output bytes to be written out.
+     *
+     * @exception IOException  if an I/O error occurs.
+     */
+    @Override public synchronized void flush() throws IOException {
+        checkClosed(null, 0);
+
+        // Send all IPC data from the local buffer.
+        if (buf != null && buf.position() > 0)
+            sendData(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public final synchronized void close() throws IOException {
+        // Do nothing if stream is already closed.
+        if (closed)
+            return;
+
+        try {
+            // Send all IPC data from the local buffer.
+            try {
+                flush();
+            }
+            finally {
+                onClose(); // "onClose()" routine must be invoked anyway!
+            }
+        }
+        finally {
+            // Mark this stream closed AFTER flush.
+            closed = true;
+        }
+    }
+
+    /**
+     * Store data blocks in file.<br/>
+     * Note! If file concurrently deleted we'll get lost blocks.
+     *
+     * @param data Data to store.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected abstract void storeDataBlock(ByteBuffer data) throws IgniteCheckedException, IOException;
+
+    /**
+     * Store data blocks in file reading appropriate number of bytes from given data input.
+     *
+     * @param in Data input to read from.
+     * @param len Data length to store.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected abstract void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException;
+
+    /**
+     * Close callback. It will be called only once in synchronized section.
+     *
+     * @throws IOException If failed.
+     */
+    protected void onClose() throws IOException {
+        // No-op.
+    }
+
+    /**
+     * Validate this stream is open.
+     *
+     * @throws IOException If this stream is closed.
+     */
+    private void checkClosed(@Nullable DataInput in, int len) throws IOException {
+        assert Thread.holdsLock(this);
+
+        if (closed) {
+            // Must read data from stream before throwing exception.
+            if (in != null)
+                in.skipBytes(len);
+
+            throw new IOException("Stream has been closed: " + this);
+        }
+    }
+
+    /**
+     * Send all local-buffered data to server.
+     *
+     * @param flip Whether to flip buffer on sending data. We do not want to flip it if sending wrapped
+     *      byte array.
+     * @throws IOException In case of IO exception.
+     */
+    private void sendData(boolean flip) throws IOException {
+        assert Thread.holdsLock(this);
+
+        try {
+            if (flip)
+                buf.flip();
+
+            storeDataBlock(buf);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException("Failed to store data into file: " + path, e);
+        }
+
+        buf = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsOutputStreamAdapter.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
new file mode 100644
index 0000000..4436191
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.task.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.igfs.IgfsMode.*;
+
+/**
+ * Output stream to store data into grid cache with separate blocks.
+ */
+class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
+    /** Maximum number of blocks in buffer. */
+    private static final int MAX_BLOCKS_CNT = 16;
+
+    /** GGFS context. */
+    private IgfsContext ggfsCtx;
+
+    /** Meta info manager. */
+    private final IgfsMetaManager meta;
+
+    /** Data manager. */
+    private final IgfsDataManager data;
+
+    /** File descriptor. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private IgfsFileInfo fileInfo;
+
+    /** Parent ID. */
+    private final IgniteUuid parentId;
+
+    /** File name. */
+    private final String fileName;
+
+    /** Space in file to write data. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private long space;
+
+    /** Intermediate remainder to keep data. */
+    private byte[] remainder;
+
+    /** Data length in remainder. */
+    private int remainderDataLen;
+
+    /** Write completion future. */
+    private final IgniteInternalFuture<Boolean> writeCompletionFut;
+
+    /** GGFS mode. */
+    private final IgfsMode mode;
+
+    /** File worker batch. */
+    private final IgfsFileWorkerBatch batch;
+
+    /** Ensures that onClose)_ routine is called no more than once. */
+    private final AtomicBoolean onCloseGuard = new AtomicBoolean();
+
+    /** Local GGFS metrics. */
+    private final IgfsLocalMetrics metrics;
+
+    /** Affinity written by this output stream. */
+    private IgfsFileAffinityRange streamRange;
+
+    /**
+     * Constructs file output stream.
+     *
+     * @param ggfsCtx GGFS context.
+     * @param path Path to stored file.
+     * @param fileInfo File info to write binary data to.
+     * @param bufSize The size of the buffer to be used.
+     * @param mode Grid GGFS mode.
+     * @param batch Optional secondary file system batch.
+     * @param metrics Local GGFs metrics.
+     * @throws IgniteCheckedException If stream creation failed.
+     */
+    IgfsOutputStreamImpl(IgfsContext ggfsCtx, IgfsPath path, IgfsFileInfo fileInfo, IgniteUuid parentId,
+        int bufSize, IgfsMode mode, @Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics)
+        throws IgniteCheckedException {
+        super(path, optimizeBufferSize(bufSize, fileInfo));
+
+        assert fileInfo != null;
+        assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
+        assert mode != null && mode != PROXY;
+        assert mode == PRIMARY && batch == null || batch != null;
+        assert metrics != null;
+
+        // File hasn't been locked.
+        if (fileInfo.lockId() == null)
+            throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
+
+        this.ggfsCtx = ggfsCtx;
+        meta = ggfsCtx.meta();
+        data = ggfsCtx.data();
+
+        this.fileInfo = fileInfo;
+        this.mode = mode;
+        this.batch = batch;
+        this.parentId = parentId;
+        this.metrics = metrics;
+
+        streamRange = initialStreamRange(fileInfo);
+
+        fileName = path.name();
+
+        writeCompletionFut = data.writeStart(fileInfo);
+    }
+
+    /**
+     * Optimize buffer size.
+     *
+     * @param bufSize Requested buffer size.
+     * @param fileInfo File info.
+     * @return Optimized buffer size.
+     */
+    @SuppressWarnings("IfMayBeConditional")
+    private static int optimizeBufferSize(int bufSize, IgfsFileInfo fileInfo) {
+        assert bufSize > 0;
+
+        if (fileInfo == null)
+            return bufSize;
+
+        int blockSize = fileInfo.blockSize();
+
+        if (blockSize <= 0)
+            return bufSize;
+
+        if (bufSize <= blockSize)
+            // Optimize minimum buffer size to be equal file's block size.
+            return blockSize;
+
+        int maxBufSize = blockSize * MAX_BLOCKS_CNT;
+
+        if (bufSize > maxBufSize)
+            // There is no profit or optimization from larger buffers.
+            return maxBufSize;
+
+        if (fileInfo.length() == 0)
+            // Make buffer size multiple of block size (optimized for new files).
+            return bufSize / blockSize * blockSize;
+
+        return bufSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+        int writeLen = block.remaining();
+
+        preStoreDataBlocks(null, writeLen);
+
+        int blockSize = fileInfo.blockSize();
+
+        // If data length is not enough to fill full block, fill the remainder and return.
+        if (remainderDataLen + writeLen < blockSize) {
+            if (remainder == null)
+                remainder = new byte[blockSize];
+            else if (remainder.length != blockSize) {
+                assert remainderDataLen == remainder.length;
+
+                byte[] allocated = new byte[blockSize];
+
+                U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+
+                remainder = allocated;
+            }
+
+            block.get(remainder, remainderDataLen, writeLen);
+
+            remainderDataLen += writeLen;
+        }
+        else {
+            remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block,
+                false, streamRange, batch);
+
+            remainderDataLen = remainder == null ? 0 : remainder.length;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
+        preStoreDataBlocks(in, len);
+
+        int blockSize = fileInfo.blockSize();
+
+        // If data length is not enough to fill full block, fill the remainder and return.
+        if (remainderDataLen + len < blockSize) {
+            if (remainder == null)
+                remainder = new byte[blockSize];
+            else if (remainder.length != blockSize) {
+                assert remainderDataLen == remainder.length;
+
+                byte[] allocated = new byte[blockSize];
+
+                U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+
+                remainder = allocated;
+            }
+
+            in.readFully(remainder, remainderDataLen, len);
+
+            remainderDataLen += len;
+        }
+        else {
+            remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len,
+                false, streamRange, batch);
+
+            remainderDataLen = remainder == null ? 0 : remainder.length;
+        }
+    }
+
+    /**
+     * Initializes data loader if it was not initialized yet and updates written space.
+     *
+     * @param len Data length to be written.
+     */
+    private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
+        // Check if any exception happened while writing data.
+        if (writeCompletionFut.isDone()) {
+            assert ((GridFutureAdapter)writeCompletionFut).isFailed();
+
+            if (in != null)
+                in.skipBytes(len);
+
+            writeCompletionFut.get();
+        }
+
+        bytes += len;
+        space += len;
+    }
+
+    /**
+     * Flushes this output stream and forces any buffered output bytes to be written out.
+     *
+     * @exception IOException  if an I/O error occurs.
+     */
+    @Override public synchronized void flush() throws IOException {
+        boolean exists;
+
+        try {
+            exists = meta.exists(fileInfo.id());
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOError(e); // Something unrecoverable.
+        }
+
+        if (!exists) {
+            onClose(true);
+
+            throw new IOException("File was concurrently deleted: " + path);
+        }
+
+        super.flush();
+
+        try {
+            if (remainder != null) {
+                data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
+                    ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
+
+                remainder = null;
+                remainderDataLen = 0;
+            }
+
+            if (space > 0) {
+                IgfsFileInfo fileInfo0 = meta.updateInfo(fileInfo.id(),
+                    new ReserveSpaceClosure(space, streamRange));
+
+                if (fileInfo0 == null)
+                    throw new IOException("File was concurrently deleted: " + path);
+                else
+                    fileInfo = fileInfo0;
+
+                streamRange = initialStreamRange(fileInfo);
+
+                space = 0;
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onClose() throws IOException {
+        onClose(false);
+    }
+
+    /**
+     * Close callback. It will be called only once in synchronized section.
+     *
+     * @param deleted Whether we already know that the file was deleted.
+     * @throws IOException If failed.
+     */
+    private void onClose(boolean deleted) throws IOException {
+        assert Thread.holdsLock(this);
+
+        if (onCloseGuard.compareAndSet(false, true)) {
+            // Notify backing secondary file system batch to finish.
+            if (mode != PRIMARY) {
+                assert batch != null;
+
+                batch.finish();
+            }
+
+            // Ensure file existence.
+            boolean exists;
+
+            try {
+                exists = !deleted && meta.exists(fileInfo.id());
+            }
+            catch (IgniteCheckedException e) {
+                throw new IOError(e); // Something unrecoverable.
+            }
+
+            if (exists) {
+                IOException err = null;
+
+                try {
+                    data.writeClose(fileInfo);
+
+                    writeCompletionFut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    err = new IOException("Failed to close stream [path=" + path + ", fileInfo=" + fileInfo + ']', e);
+                }
+
+                metrics.addWrittenBytesTime(bytes, time);
+
+                // Await secondary file system processing to finish.
+                if (mode == DUAL_SYNC) {
+                    try {
+                        batch.await();
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (err == null)
+                            err = new IOException("Failed to close secondary file system stream [path=" + path +
+                                ", fileInfo=" + fileInfo + ']', e);
+                    }
+                }
+
+                long modificationTime = System.currentTimeMillis();
+
+                try {
+                    meta.unlock(fileInfo, modificationTime);
+                }
+                catch (IgfsFileNotFoundException ignore) {
+                    data.delete(fileInfo); // Safety to ensure that all data blocks are deleted.
+
+                    throw new IOException("File was concurrently deleted: " + path);
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IOError(e); // Something unrecoverable.
+                }
+
+                meta.updateParentListingAsync(parentId, fileInfo.id(), fileName, bytes, modificationTime);
+
+                if (err != null)
+                    throw err;
+            }
+            else {
+                try {
+                    if (mode == DUAL_SYNC)
+                        batch.await();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IOException("Failed to close secondary file system stream [path=" + path +
+                        ", fileInfo=" + fileInfo + ']', e);
+                }
+                finally {
+                    data.delete(fileInfo);
+                }
+            }
+        }
+    }
+
+    /**
+     * Gets initial affinity range. This range will have 0 length and will start from first
+     * non-occupied file block.
+     *
+     * @param fileInfo File info to build initial range for.
+     * @return Affinity range.
+     */
+    private IgfsFileAffinityRange initialStreamRange(IgfsFileInfo fileInfo) {
+        if (!ggfsCtx.configuration().isFragmentizerEnabled())
+            return null;
+
+        if (!Boolean.parseBoolean(fileInfo.properties().get(IgniteFs.PROP_PREFER_LOCAL_WRITES)))
+            return null;
+
+        int blockSize = fileInfo.blockSize();
+
+        // Find first non-occupied block offset.
+        long off = ((fileInfo.length() + blockSize - 1) / blockSize) * blockSize;
+
+        // Need to get last affinity key and reuse it if we are on the same node.
+        long lastBlockOff = off - fileInfo.blockSize();
+
+        if (lastBlockOff < 0)
+            lastBlockOff = 0;
+
+        IgfsFileMap map = fileInfo.fileMap();
+
+        IgniteUuid prevAffKey = map == null ? null : map.affinityKey(lastBlockOff, false);
+
+        IgniteUuid affKey = data.nextAffinityKey(prevAffKey);
+
+        return affKey == null ? null : new IgfsFileAffinityRange(off, off, affKey);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsOutputStreamImpl.class, this);
+    }
+
+    /**
+     * Helper closure to reserve specified space and update file's length
+     */
+    @GridInternal
+    private static final class ReserveSpaceClosure implements IgniteClosure<IgfsFileInfo, IgfsFileInfo>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Space amount (bytes number) to increase file's length. */
+        private long space;
+
+        /** Affinity range for this particular update. */
+        private IgfsFileAffinityRange range;
+
+        /**
+         * Empty constructor required for {@link Externalizable}.
+         *
+         */
+        public ReserveSpaceClosure() {
+            // No-op.
+        }
+
+        /**
+         * Constructs the closure to reserve specified space and update file's length.
+         *
+         * @param space Space amount (bytes number) to increase file's length.
+         * @param range Affinity range specifying which part of file was colocated.
+         */
+        private ReserveSpaceClosure(long space, IgfsFileAffinityRange range) {
+            this.space = space;
+            this.range = range;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgfsFileInfo apply(IgfsFileInfo oldInfo) {
+            IgfsFileMap oldMap = oldInfo.fileMap();
+
+            IgfsFileMap newMap = new IgfsFileMap(oldMap);
+
+            newMap.addRange(range);
+
+            // Update file length.
+            IgfsFileInfo updated = new IgfsFileInfo(oldInfo, oldInfo.length() + space);
+
+            updated.fileMap(newMap);
+
+            return updated;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeLong(space);
+            out.writeObject(range);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            space = in.readLong();
+            range = (IgfsFileAffinityRange)in.readObject();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ReserveSpaceClosure.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
new file mode 100644
index 0000000..c5f8d97
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Description of path modes.
+ */
+public class IgfsPaths implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Additional secondary file system properties. */
+    private Map<String, String> props;
+
+    /** Default GGFS mode. */
+    private IgfsMode dfltMode;
+
+    /** Path modes. */
+    private List<T2<IgfsPath, IgfsMode>> pathModes;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public IgfsPaths() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param props Additional secondary file system properties.
+     * @param dfltMode Default GGFS mode.
+     * @param pathModes Path modes.
+     */
+    public IgfsPaths(Map<String, String> props, IgfsMode dfltMode, @Nullable List<T2<IgfsPath,
+        IgfsMode>> pathModes) {
+        this.props = props;
+        this.dfltMode = dfltMode;
+        this.pathModes = pathModes;
+    }
+
+    /**
+     * @return Secondary file system properties.
+     */
+    public Map<String, String> properties() {
+        return props;
+    }
+
+    /**
+     * @return Default GGFS mode.
+     */
+    public IgfsMode defaultMode() {
+        return dfltMode;
+    }
+
+    /**
+     * @return Path modes.
+     */
+    @Nullable public List<T2<IgfsPath, IgfsMode>> pathModes() {
+        return pathModes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeStringMap(out, props);
+        U.writeEnum(out, dfltMode);
+
+        if (pathModes != null) {
+            out.writeBoolean(true);
+            out.writeInt(pathModes.size());
+
+            for (T2<IgfsPath, IgfsMode> pathMode : pathModes) {
+                pathMode.getKey().writeExternal(out);
+                U.writeEnum(out, pathMode.getValue());
+            }
+        }
+        else
+            out.writeBoolean(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        props = U.readStringMap(in);
+        dfltMode = IgfsMode.fromOrdinal(in.readByte());
+
+        if (in.readBoolean()) {
+            int size = in.readInt();
+
+            pathModes = new ArrayList<>(size);
+
+            for (int i = 0; i < size; i++) {
+                IgfsPath path = new IgfsPath();
+                path.readExternal(in);
+
+                T2<IgfsPath, IgfsMode> entry = new T2<>(path, IgfsMode.fromOrdinal(in.readByte()));
+
+                pathModes.add(entry);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
new file mode 100644
index 0000000..8508844
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.mapreduce.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.ipc.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.IgniteSystemProperties.*;
+import static org.apache.ignite.cache.CacheMemoryMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.igfs.IgfsMode.*;
+import static org.apache.ignite.internal.IgniteNodeAttributes.*;
+
+/**
+ * Fully operational Ignite file system processor.
+ */
+public class IgfsProcessor extends IgfsProcessorAdapter {
+    /** Null GGFS name. */
+    private static final String NULL_NAME = UUID.randomUUID().toString();
+
+    /** Converts context to GGFS. */
+    private static final IgniteClosure<IgfsContext,IgniteFs> CTX_TO_GGFS = new C1<IgfsContext, IgniteFs>() {
+        @Override public IgniteFs apply(IgfsContext ggfsCtx) {
+            return ggfsCtx.ggfs();
+        }
+    };
+
+    /** */
+    private final ConcurrentMap<String, IgfsContext> ggfsCache =
+        new ConcurrentHashMap8<>();
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public IgfsProcessor(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.config().isDaemon())
+            return;
+
+        IgfsConfiguration[] cfgs = ctx.config().getGgfsConfiguration();
+
+        assert cfgs != null && cfgs.length > 0;
+
+        validateLocalGgfsConfigurations(cfgs);
+
+        // Start GGFS instances.
+        for (IgfsConfiguration cfg : cfgs) {
+            IgfsContext ggfsCtx = new IgfsContext(
+                ctx,
+                new IgfsConfiguration(cfg),
+                new IgfsMetaManager(),
+                new IgfsDataManager(),
+                new IgfsServerManager(),
+                new IgfsFragmentizerManager());
+
+            // Start managers first.
+            for (IgfsManager mgr : ggfsCtx.managers())
+                mgr.start(ggfsCtx);
+
+            ggfsCache.put(maskName(cfg.getName()), ggfsCtx);
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("GGFS processor started.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        if (ctx.config().isDaemon())
+            return;
+
+        if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
+            for (ClusterNode n : ctx.discovery().remoteNodes())
+                checkGgfsOnRemoteNode(n);
+        }
+
+        for (IgfsContext ggfsCtx : ggfsCache.values())
+            for (IgfsManager mgr : ggfsCtx.managers())
+                mgr.onKernalStart();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        // Stop GGFS instances.
+        for (IgfsContext ggfsCtx : ggfsCache.values()) {
+            if (log.isDebugEnabled())
+                log.debug("Stopping ggfs: " + ggfsCtx.configuration().getName());
+
+            List<IgfsManager> mgrs = ggfsCtx.managers();
+
+            for (ListIterator<IgfsManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) {
+                IgfsManager mgr = it.previous();
+
+                mgr.stop(cancel);
+            }
+
+            ggfsCtx.ggfs().stop();
+        }
+
+        ggfsCache.clear();
+
+        if (log.isDebugEnabled())
+            log.debug("GGFS processor stopped.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        for (IgfsContext ggfsCtx : ggfsCache.values()) {
+            if (log.isDebugEnabled())
+                log.debug("Stopping ggfs: " + ggfsCtx.configuration().getName());
+
+            List<IgfsManager> mgrs = ggfsCtx.managers();
+
+            for (ListIterator<IgfsManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) {
+                IgfsManager mgr = it.previous();
+
+                mgr.onKernalStop(cancel);
+            }
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Finished executing GGFS processor onKernalStop() callback.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printMemoryStats() {
+        X.println(">>>");
+        X.println(">>> GGFS processor memory stats [grid=" + ctx.gridName() + ']');
+        X.println(">>>   ggfsCacheSize: " + ggfsCache.size());
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public Collection<IgniteFs> ggfss() {
+        return F.viewReadOnly(ggfsCache.values(), CTX_TO_GGFS);
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public IgniteFs ggfs(@Nullable String name) {
+        IgfsContext ggfsCtx = ggfsCache.get(maskName(name));
+
+        return ggfsCtx == null ? null : ggfsCtx.ggfs();
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public Collection<IpcServerEndpoint> endpoints(@Nullable String name) {
+        IgfsContext ggfsCtx = ggfsCache.get(maskName(name));
+
+        return ggfsCtx == null ? Collections.<IpcServerEndpoint>emptyList() : ggfsCtx.server().endpoints();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public ComputeJob createJob(IgfsJob job, @Nullable String ggfsName, IgfsPath path,
+        long start, long len, IgfsRecordResolver recRslv) {
+        return new IgfsJobImpl(job, ggfsName, path, start, len, recRslv);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void addAttributes(Map<String, Object> attrs) throws IgniteCheckedException {
+        super.addAttributes(attrs);
+
+        IgniteConfiguration gridCfg = ctx.config();
+
+        // Node doesn't have GGFS if it:
+        // is daemon;
+        // doesn't have configured GGFS;
+        // doesn't have configured caches.
+        if (gridCfg.isDaemon() || F.isEmpty(gridCfg.getGgfsConfiguration()) ||
+            F.isEmpty(gridCfg.getCacheConfiguration()))
+            return;
+
+        final Map<String, CacheConfiguration> cacheCfgs = new HashMap<>();
+
+        F.forEach(gridCfg.getCacheConfiguration(), new CI1<CacheConfiguration>() {
+            @Override public void apply(CacheConfiguration c) {
+                cacheCfgs.put(c.getName(), c);
+            }
+        });
+
+        Collection<IgfsAttributes> attrVals = new ArrayList<>();
+
+        assert gridCfg.getGgfsConfiguration() != null;
+
+        for (IgfsConfiguration ggfsCfg : gridCfg.getGgfsConfiguration()) {
+            CacheConfiguration cacheCfg = cacheCfgs.get(ggfsCfg.getDataCacheName());
+
+            if (cacheCfg == null)
+                continue; // No cache for the given GGFS configuration.
+
+            CacheAffinityKeyMapper affMapper = cacheCfg.getAffinityMapper();
+
+            if (!(affMapper instanceof IgfsGroupDataBlocksKeyMapper))
+                // Do not create GGFS attributes for such a node nor throw error about invalid configuration.
+                // Configuration will be validated later, while starting GridGgfsProcessor.
+                continue;
+
+            attrVals.add(new IgfsAttributes(
+                ggfsCfg.getName(),
+                ggfsCfg.getBlockSize(),
+                ((IgfsGroupDataBlocksKeyMapper)affMapper).groupSize(),
+                ggfsCfg.getMetaCacheName(),
+                ggfsCfg.getDataCacheName(),
+                ggfsCfg.getDefaultMode(),
+                ggfsCfg.getPathModes(),
+                ggfsCfg.isFragmentizerEnabled()));
+        }
+
+        attrs.put(ATTR_GGFS, attrVals.toArray(new IgfsAttributes[attrVals.size()]));
+    }
+
+    /**
+     * @param name Cache name.
+     * @return Masked name accounting for {@code nulls}.
+     */
+    private String maskName(@Nullable String name) {
+        return name == null ? NULL_NAME : name;
+    }
+
+    /**
+     * Validates local GGFS configurations. Compares attributes only for GGFSes with same name.
+     * @param cfgs GGFS configurations
+     * @throws IgniteCheckedException If any of GGFS configurations is invalid.
+     */
+    private void validateLocalGgfsConfigurations(IgfsConfiguration[] cfgs) throws IgniteCheckedException {
+        Collection<String> cfgNames = new HashSet<>();
+
+        for (IgfsConfiguration cfg : cfgs) {
+            String name = cfg.getName();
+
+            if (cfgNames.contains(name))
+                throw new IgniteCheckedException("Duplicate GGFS name found (check configuration and " +
+                    "assign unique name to each): " + name);
+
+            GridCacheAdapter<Object, Object> dataCache = ctx.cache().internalCache(cfg.getDataCacheName());
+
+            if (dataCache == null)
+                throw new IgniteCheckedException("Data cache is not configured locally for GGFS: " + cfg);
+
+            if (dataCache.configuration().isQueryIndexEnabled())
+                throw new IgniteCheckedException("GGFS data cache cannot start with enabled query indexing.");
+
+            GridCache<Object, Object> metaCache = ctx.cache().cache(cfg.getMetaCacheName());
+
+            if (metaCache == null)
+                throw new IgniteCheckedException("Metadata cache is not configured locally for GGFS: " + cfg);
+
+            if (metaCache.configuration().isQueryIndexEnabled())
+                throw new IgniteCheckedException("GGFS metadata cache cannot start with enabled query indexing.");
+
+            if (F.eq(cfg.getDataCacheName(), cfg.getMetaCacheName()))
+                throw new IgniteCheckedException("Cannot use same cache as both data and meta cache: " + cfg.getName());
+
+            if (!(dataCache.configuration().getAffinityMapper() instanceof IgfsGroupDataBlocksKeyMapper))
+                throw new IgniteCheckedException("Invalid GGFS data cache configuration (key affinity mapper class should be " +
+                    IgfsGroupDataBlocksKeyMapper.class.getSimpleName() + "): " + cfg);
+
+            long maxSpaceSize = cfg.getMaxSpaceSize();
+
+            if (maxSpaceSize > 0) {
+                // Max space validation.
+                long maxHeapSize = Runtime.getRuntime().maxMemory();
+                long offHeapSize = dataCache.configuration().getOffHeapMaxMemory();
+
+                if (offHeapSize < 0 && maxSpaceSize > maxHeapSize)
+                    // Offheap is disabled.
+                    throw new IgniteCheckedException("Maximum GGFS space size cannot be greater that size of available heap " +
+                        "memory [maxHeapSize=" + maxHeapSize + ", maxGgfsSpaceSize=" + maxSpaceSize + ']');
+                else if (offHeapSize > 0 && maxSpaceSize > maxHeapSize + offHeapSize)
+                    // Offheap is enabled, but limited.
+                    throw new IgniteCheckedException("Maximum GGFS space size cannot be greater than size of available heap " +
+                        "memory and offheap storage [maxHeapSize=" + maxHeapSize + ", offHeapSize=" + offHeapSize +
+                        ", maxGgfsSpaceSize=" + maxSpaceSize + ']');
+            }
+
+            if (dataCache.configuration().getCacheMode() == PARTITIONED) {
+                int backups = dataCache.configuration().getBackups();
+
+                if (backups != 0)
+                    throw new IgniteCheckedException("GGFS data cache cannot be used with backups (set backup count " +
+                        "to 0 and restart the grid): " + cfg.getDataCacheName());
+            }
+
+            if (cfg.getMaxSpaceSize() == 0 && dataCache.configuration().getMemoryMode() == OFFHEAP_VALUES)
+                U.warn(log, "GGFS max space size is not specified but data cache values are stored off-heap (max " +
+                    "space will be limited to 80% of max JVM heap size): " + cfg.getName());
+
+            boolean secondary = cfg.getDefaultMode() == PROXY;
+
+            if (cfg.getPathModes() != null) {
+                for (Map.Entry<String, IgfsMode> mode : cfg.getPathModes().entrySet()) {
+                    if (mode.getValue() == PROXY)
+                        secondary = true;
+                }
+            }
+
+            if (secondary) {
+                // When working in any mode except of primary, secondary FS config must be provided.
+                assertParameter(cfg.getSecondaryFileSystem() != null,
+                    "secondaryFileSystem cannot be null when mode is SECONDARY");
+            }
+
+            cfgNames.add(name);
+        }
+    }
+
+    /**
+     * Check GGFS config on remote node.
+     *
+     * @param rmtNode Remote node.
+     * @throws IgniteCheckedException If check failed.
+     */
+    private void checkGgfsOnRemoteNode(ClusterNode rmtNode) throws IgniteCheckedException {
+        IgfsAttributes[] locAttrs = ctx.discovery().localNode().attribute(IgniteNodeAttributes.ATTR_GGFS);
+        IgfsAttributes[] rmtAttrs = rmtNode.attribute(IgniteNodeAttributes.ATTR_GGFS);
+
+        if (F.isEmpty(locAttrs) || F.isEmpty(rmtAttrs))
+            return;
+
+        assert rmtAttrs != null && locAttrs != null;
+
+        for (IgfsAttributes rmtAttr : rmtAttrs)
+            for (IgfsAttributes locAttr : locAttrs) {
+                // Checking the use of different caches on the different GGFSes.
+                if (!F.eq(rmtAttr.ggfsName(), locAttr.ggfsName())) {
+                    if (F.eq(rmtAttr.metaCacheName(), locAttr.metaCacheName()))
+                        throw new IgniteCheckedException("Meta cache names should be different for different GGFS instances " +
+                            "configuration (fix configuration or set " +
+                            "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " +
+                            "property) [metaCacheName=" + rmtAttr.metaCacheName() +
+                            ", locNodeId=" + ctx.localNodeId() +
+                            ", rmtNodeId=" + rmtNode.id() +
+                            ", locGgfsName=" + locAttr.ggfsName() +
+                            ", rmtGgfsName=" + rmtAttr.ggfsName() + ']');
+
+                    if (F.eq(rmtAttr.dataCacheName(), locAttr.dataCacheName()))
+                        throw new IgniteCheckedException("Data cache names should be different for different GGFS instances " +
+                            "configuration (fix configuration or set " +
+                            "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " +
+                            "property)[dataCacheName=" + rmtAttr.dataCacheName() +
+                            ", locNodeId=" + ctx.localNodeId() +
+                            ", rmtNodeId=" + rmtNode.id() +
+                            ", locGgfsName=" + locAttr.ggfsName() +
+                            ", rmtGgfsName=" + rmtAttr.ggfsName() + ']');
+
+                    continue;
+                }
+
+                // Compare other attributes only for GGFSes with same name.
+                checkSame("Data block size", "BlockSize", rmtNode.id(), rmtAttr.blockSize(),
+                    locAttr.blockSize(), rmtAttr.ggfsName());
+
+                checkSame("Affinity mapper group size", "GrpSize", rmtNode.id(), rmtAttr.groupSize(),
+                    locAttr.groupSize(), rmtAttr.ggfsName());
+
+                checkSame("Meta cache name", "MetaCacheName", rmtNode.id(), rmtAttr.metaCacheName(),
+                    locAttr.metaCacheName(), rmtAttr.ggfsName());
+
+                checkSame("Data cache name", "DataCacheName", rmtNode.id(), rmtAttr.dataCacheName(),
+                    locAttr.dataCacheName(), rmtAttr.ggfsName());
+
+                checkSame("Default mode", "DefaultMode", rmtNode.id(), rmtAttr.defaultMode(),
+                    locAttr.defaultMode(), rmtAttr.ggfsName());
+
+                checkSame("Path modes", "PathModes", rmtNode.id(), rmtAttr.pathModes(),
+                    locAttr.pathModes(), rmtAttr.ggfsName());
+
+                checkSame("Fragmentizer enabled", "FragmentizerEnabled", rmtNode.id(), rmtAttr.fragmentizerEnabled(),
+                    locAttr.fragmentizerEnabled(), rmtAttr.ggfsName());
+            }
+    }
+
+    private void checkSame(String name, String propName, UUID rmtNodeId, Object rmtVal, Object locVal, String ggfsName)
+        throws IgniteCheckedException {
+        if (!F.eq(rmtVal, locVal))
+            throw new IgniteCheckedException(name + " should be the same on all nodes in grid for GGFS configuration " +
+                "(fix configuration or set " +
+                "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " +
+                "property ) [rmtNodeId=" + rmtNodeId +
+                ", rmt" + propName + "=" + rmtVal +
+                ", loc" + propName + "=" + locVal +
+                ", ggfName=" + ggfsName + ']');
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorAdapter.java
new file mode 100644
index 0000000..9e49c64
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorAdapter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.mapreduce.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.ipc.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Ignite file system processor adapter.
+ */
+public abstract class IgfsProcessorAdapter extends GridProcessorAdapter {
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    protected IgfsProcessorAdapter(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /**
+     * Gets all GGFS instances.
+     *
+     * @return Collection of GGFS instances.
+     */
+    public abstract Collection<IgniteFs> ggfss();
+
+    /**
+     * Gets GGFS instance.
+     *
+     * @param name (Nullable) GGFS name.
+     * @return GGFS instance.
+     */
+    @Nullable public abstract IgniteFs ggfs(@Nullable String name);
+
+    /**
+     * Gets server endpoints for particular GGFS.
+     *
+     * @param name GGFS name.
+     * @return Collection of endpoints or {@code null} in case GGFS is not defined.
+     */
+    public abstract Collection<IpcServerEndpoint> endpoints(@Nullable String name);
+
+    /**
+     * Create compute job for the given GGFS job.
+     *
+     * @param job GGFS job.
+     * @param ggfsName GGFS name.
+     * @param path Path.
+     * @param start Start position.
+     * @param length Length.
+     * @param recRslv Record resolver.
+     * @return Compute job.
+     */
+    @Nullable public abstract ComputeJob createJob(IgfsJob job, @Nullable String ggfsName, IgfsPath path,
+        long start, long length, IgfsRecordResolver recRslv);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSamplingKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSamplingKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSamplingKey.java
new file mode 100644
index 0000000..c91d0d0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSamplingKey.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Internal key used to track if sampling enabled or disabled for particular GGFS instance.
+ */
+class IgfsSamplingKey implements GridCacheInternal, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** GGFS name. */
+    private String name;
+
+    /**
+     * Default constructor.
+     *
+     * @param name - GGFS name.
+     */
+    IgfsSamplingKey(String name) {
+        this.name = name;
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public IgfsSamplingKey() {
+        // No-op.
+    }
+
+    /**
+     * @return GGFS name.
+     */
+    public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return name == null ? 0 : name.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return this == obj || (obj instanceof IgfsSamplingKey && F.eq(name, ((IgfsSamplingKey)obj).name));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException {
+        name = U.readString(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsSamplingKey.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryInputStreamDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryInputStreamDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryInputStreamDescriptor.java
new file mode 100644
index 0000000..6e48103
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryInputStreamDescriptor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.*;
+
+/**
+ * Descriptor of an input stream opened to the secondary file system.
+ */
+public class IgfsSecondaryInputStreamDescriptor {
+    /** File info in the primary file system. */
+    private final IgfsFileInfo info;
+
+    /** Secondary file system input stream wrapper. */
+    private final IgfsReader secReader;
+
+    /**
+     * Constructor.
+     *
+     * @param info File info in the primary file system.
+     * @param secReader Secondary file system reader.
+     */
+    IgfsSecondaryInputStreamDescriptor(IgfsFileInfo info, IgfsReader secReader) {
+        assert info != null;
+        assert secReader != null;
+
+        this.info = info;
+        this.secReader = secReader;
+    }
+
+    /**
+     * @return File info in the primary file system.
+     */
+    IgfsFileInfo info() {
+        return info;
+    }
+
+    /**
+     * @return Secondary file system reader.
+     */
+    IgfsReader reader() {
+        return secReader;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
new file mode 100644
index 0000000..f3921e8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.lang.*;
+
+import java.io.*;
+
+/**
+ * Descriptor of an output stream opened to the secondary file system.
+ */
+public class IgfsSecondaryOutputStreamDescriptor {
+    /** Parent ID in the primary file system. */
+    private final IgniteUuid parentId;
+
+    /** File info in the primary file system. */
+    private final IgfsFileInfo info;
+
+    /** Output stream to the secondary file system. */
+    private final OutputStream out;
+
+    /**
+     * Constructor.
+     *
+     * @param parentId Parent ID in the primary file system.
+     * @param info File info in the primary file system.
+     * @param out Output stream to the secondary file system.
+     */
+    IgfsSecondaryOutputStreamDescriptor(IgniteUuid parentId, IgfsFileInfo info, OutputStream out) {
+        assert parentId != null;
+        assert info != null;
+        assert out != null;
+
+        this.parentId = parentId;
+        this.info = info;
+        this.out = out;
+    }
+
+    /**
+     * @return Parent ID in the primary file system.
+     */
+    IgniteUuid parentId() {
+        return parentId;
+    }
+
+    /**
+     * @return File info in the primary file system.
+     */
+    IgfsFileInfo info() {
+        return info;
+    }
+
+    /**
+     * @return Output stream to the secondary file system.
+     */
+    OutputStream out() {
+        return out;
+    }
+}


Mime
View raw message