ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dsetrak...@apache.org
Subject [15/61] [abbrv] incubator-ignite git commit: sprint-1 - Renaming and moving ignitefs classes.
Date Fri, 06 Feb 2015 04:25:28 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsFileSystem.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsFileSystem.java
new file mode 100644
index 0000000..967e26b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsFileSystem.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import org.apache.ignite.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Common file system interface. It provides a typical generalized "view" of any file system:
+ * <ul>
+ *     <li>list directories or get information for a single path</li>
+ *     <li>create/move/delete files or directories</li>
+ *     <li>write/read data streams into/from files</li>
+ * </ul>
+ *
+ * This is the minimum of functionality that is needed to work as secondary file system in dual modes of GGFS.
+ */
+public interface IgniteFsFileSystem {
+    /** File property: user name. */
+    public static final String PROP_USER_NAME = "usrName";
+
+    /** File property: group name. */
+    public static final String PROP_GROUP_NAME = "grpName";
+
+    /** File property: permission. */
+    public static final String PROP_PERMISSION = "permission";
+
+    /**
+     * Checks if the specified path exists in the file system.
+     *
+     * @param path Path to check for existence in the file system.
+     * @return {@code True} if such file exists, otherwise - {@code false}.
+     * @throws IgniteException In case of error.
+     */
+    public boolean exists(IgniteFsPath path);
+
+    /**
+     * Updates file information for the specified path. Existent properties, not listed in the passed collection,
+     * will not be affected. Other properties will be added or overwritten. Passed properties with {@code null} values
+     * will be removed from the stored properties or ignored if they don't exist in the file info.
+     * <p>
+     * When working in {@code DUAL_SYNC} or {@code DUAL_ASYNC} modes only the following properties will be propagated
+     * to the secondary file system:
+     * <ul>
+     * <li>{@code usrName} - file owner name;</li>
+     * <li>{@code grpName} - file owner group;</li>
+     * <li>{@code permission} - Unix-style string representing file permissions.</li>
+     * </ul>
+     *
+     * @param path File path to set properties for.
+     * @param props Properties to update.
+     * @return File information for specified path or {@code null} if such path does not exist.
+     * @throws IgniteException In case of error.
+     */
+    @Nullable public IgniteFsFile update(IgniteFsPath path, Map<String, String> props) throws IgniteException;
+
+    /**
+     * Renames/moves a file.
+     * <p>
+     * You are free to rename/move data files as you wish, but directories can be only renamed.
+     * You cannot move the directory between different parent directories.
+     * <p>
+     * Examples:
+     * <ul>
+     *     <li>"/work/file.txt" => "/home/project/Presentation Scenario.txt"</li>
+     *     <li>"/work" => "/work-2012.bkp"</li>
+     *     <li>"/work" => "<strike>/backups/work</strike>" - such operation is restricted for directories.</li>
+     * </ul>
+     *
+     * @param src Source file path to rename.
+     * @param dest Destination file path. If destination path is a directory, then source file will be placed
+     *     into destination directory with original name.
+     * @throws IgniteException In case of error.
+     * @throws IgniteFsFileNotFoundException If source file doesn't exist.
+     */
+    public void rename(IgniteFsPath src, IgniteFsPath dest) throws IgniteException;
+
+    /**
+     * Deletes file.
+     *
+     * @param path File path to delete.
+     * @param recursive Delete non-empty directories recursively.
+     * @return {@code True} in case of success, {@code false} otherwise.
+     * @throws IgniteException In case of error.
+     */
+    public boolean delete(IgniteFsPath path, boolean recursive) throws IgniteException;
+
+    /**
+     * Creates directories under specified path.
+     *
+     * @param path Path of directories chain to create.
+     * @throws IgniteException In case of error.
+     */
+    public void mkdirs(IgniteFsPath path) throws IgniteException;
+
+    /**
+     * Creates directories under specified path with the specified properties.
+     *
+     * @param path Path of directories chain to create.
+     * @param props Metadata properties to set on created directories.
+     * @throws IgniteException In case of error.
+     */
+    public void mkdirs(IgniteFsPath path, @Nullable Map<String, String> props) throws IgniteException;
+
+    /**
+     * Lists file paths under the specified path.
+     *
+     * @param path Path to list files under.
+     * @return List of files under the specified path.
+     * @throws IgniteException In case of error.
+     * @throws IgniteFsFileNotFoundException If path doesn't exist.
+     */
+    public Collection<IgniteFsPath> listPaths(IgniteFsPath path) throws IgniteException;
+
+    /**
+     * Lists files under the specified path.
+     *
+     * @param path Path to list files under.
+     * @return List of files under the specified path.
+     * @throws IgniteException In case of error.
+     * @throws IgniteFsFileNotFoundException If path doesn't exist.
+     */
+    public Collection<IgniteFsFile> listFiles(IgniteFsPath path) throws IgniteException;
+
+    /**
+     * Opens a file for reading.
+     *
+     * @param path File path to read.
+     * @param bufSize Read buffer size (bytes) or {@code zero} to use default value.
+     * @return File input stream to read data from.
+     * @throws IgniteException In case of error.
+     * @throws IgniteFsFileNotFoundException If path doesn't exist.
+     */
+    public IgniteFsReader open(IgniteFsPath path, int bufSize) throws IgniteException;
+
+    /**
+     * Creates a file and opens it for writing.
+     *
+     * @param path File path to create.
+     * @param overwrite Overwrite file if it already exists. Note: you cannot overwrite an existent directory.
+     * @return File output stream to write data to.
+     * @throws IgniteException In case of error.
+     */
+    public OutputStream create(IgniteFsPath path, boolean overwrite) throws IgniteException;
+
+    /**
+     * Creates a file and opens it for writing.
+     *
+     * @param path File path to create.
+     * @param bufSize Write buffer size (bytes) or {@code zero} to use default value.
+     * @param overwrite Overwrite file if it already exists. Note: you cannot overwrite an existent directory.
+     * @param replication Replication factor.
+     * @param blockSize Block size.
+     * @param props File properties to set.
+     * @return File output stream to write data to.
+     * @throws IgniteException In case of error.
+     */
+    public OutputStream create(IgniteFsPath path, int bufSize, boolean overwrite, int replication, long blockSize,
+       @Nullable Map<String, String> props) throws IgniteException;
+
+    /**
+     * Opens an output stream to an existing file for appending data.
+     *
+     * @param path File path to append.
+     * @param bufSize Write buffer size (bytes) or {@code zero} to use default value.
+     * @param create Create file if it doesn't exist yet.
+     * @param props File properties to set only in case it file was just created.
+     * @return File output stream to append data to.
+     * @throws IgniteException In case of error.
+     * @throws IgniteFsFileNotFoundException If path doesn't exist and create flag is {@code false}.
+     */
+    public OutputStream append(IgniteFsPath path, int bufSize, boolean create, @Nullable Map<String, String> props)
+        throws IgniteException;
+
+    /**
+     * Gets file information for the specified path.
+     *
+     * @param path Path to get information for.
+     * @return File information for specified path or {@code null} if such path does not exist.
+     * @throws IgniteException In case of error.
+     */
+    @Nullable public IgniteFsFile info(IgniteFsPath path) throws IgniteException;
+
+    /**
+     * Gets used space in bytes.
+     *
+     * @return Used space in bytes.
+     * @throws IgniteException In case of error.
+     */
+    public long usedSpaceSize() throws IgniteException;
+
+    /**
+     * Gets the implementation specific properties of file system.
+     *
+     * @return Map of properties.
+     */
+    public Map<String,String> properties();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsGroupDataBlocksKeyMapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsGroupDataBlocksKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsGroupDataBlocksKeyMapper.java
new file mode 100644
index 0000000..f40b10e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsGroupDataBlocksKeyMapper.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.fs.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+/**
+ * {@code GGFS} class providing ability to group file's data blocks together on one node.
+ * All blocks within the same group are guaranteed to be cached together on the same node.
+ * Group size parameter controls how many sequential blocks will be cached together on the same node.
+ * <p>
+ * For example, if block size is {@code 64kb} and group size is {@code 256}, then each group will contain
+ * {@code 64kb * 256 = 16Mb}. Larger group sizes would reduce number of splits required to run map-reduce
+ * tasks, but will increase inequality of data size being stored on different nodes.
+ * <p>
+ * Note that {@link #groupSize()} parameter must correlate to Hadoop split size parameter defined
+ * in Hadoop via {@code mapred.max.split.size} property. Ideally you want all blocks accessed
+ * within one split to be mapped to {@code 1} group, so they can be located on the same grid node.
+ * For example, default Hadoop split size is {@code 64mb} and default {@code GGFS} block size
+ * is {@code 64kb}. This means that to make sure that each split goes only through blocks on
+ * the same node (without hopping between nodes over network), we have to make the {@link #groupSize()}
+ * value be equal to {@code 64mb / 64kb = 1024}.
+ * <p>
+ * It is required for {@code GGFS} data cache to be configured with this mapper. Here is an
+ * example of how it can be specified in XML configuration:
+ * <pre name="code" class="xml">
+ * &lt;bean id="cacheCfgBase" class="org.gridgain.grid.cache.GridCacheConfiguration" abstract="true"&gt;
+ *     ...
+ *     &lt;property name="affinityMapper"&gt;
+ *         &lt;bean class="org.apache.ignite.fs.IgniteFsGroupDataBlocksKeyMapper"&gt;
+ *             &lt;!-- How many sequential blocks will be stored on the same node. --&gt;
+ *             &lt;constructor-arg value="512"/&gt;
+ *         &lt;/bean&gt;
+ *     &lt;/property&gt;
+ *     ...
+ * &lt;/bean&gt;
+ * </pre>
+ */
+public class IgniteFsGroupDataBlocksKeyMapper extends GridCacheDefaultAffinityKeyMapper {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Size of the group. */
+    private final int grpSize;
+
+    /***
+     * Constructs affinity mapper to group several data blocks with the same key.
+     *
+     * @param grpSize Size of the group in blocks.
+     */
+    public IgniteFsGroupDataBlocksKeyMapper(int grpSize) {
+        A.ensure(grpSize >= 1, "grpSize >= 1");
+
+        this.grpSize = grpSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object affinityKey(Object key) {
+        if (key != null && GridGgfsBlockKey.class.equals(key.getClass())) {
+            GridGgfsBlockKey blockKey = (GridGgfsBlockKey)key;
+
+            if (blockKey.affinityKey() != null)
+                return blockKey.affinityKey();
+
+            long grpId = blockKey.getBlockId() / grpSize;
+
+            return blockKey.getFileId().hashCode() + (int)(grpId ^ (grpId >>> 32));
+        }
+
+        return super.affinityKey(key);
+    }
+
+    /**
+     * @return Size of the group.
+     */
+    public int groupSize() {
+        return grpSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteFsGroupDataBlocksKeyMapper.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInputStream.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInputStream.java
new file mode 100644
index 0000000..510b948
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInputStream.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import java.io.*;
+
+/**
+ * {@code GGFS} input stream to read data from the file system.
+ * It provides several additional methods for asynchronous access.
+ */
+public abstract class IgniteFsInputStream extends InputStream implements IgniteFsReader {
+    /**
+     * Gets file length during file open.
+     *
+     * @return File length.
+     */
+    public abstract long length();
+
+    /**
+     * Seek to the specified position.
+     *
+     * @param pos Position to seek to.
+     * @throws IOException In case of IO exception.
+     */
+    public abstract void seek(long pos) throws IOException;
+
+    /**
+     * Get the current position in the input stream.
+     *
+     * @return The current position in the input stream.
+     * @throws IOException In case of IO exception.
+     */
+    public abstract long position() throws IOException;
+
+    /**
+     * Read bytes from the given position in the stream to the given buffer.
+     * Continues to read until passed buffer becomes filled.
+     *
+     * @param pos Position in the input stream to seek.
+     * @param buf Buffer into which data is read.
+     * @throws IOException In case of IO exception.
+     */
+    public abstract void readFully(long pos, byte[] buf) throws IOException;
+
+    /**
+     *
+     * @param pos Position in the input stream to seek.
+     * @param buf Buffer into which data is read.
+     * @param off Offset in the buffer from which stream data should be written.
+     * @param len The number of bytes to read.
+     * @throws IOException In case of IO exception.
+     */
+    public abstract void readFully(long pos, byte[] buf, int off, int len) throws IOException;
+
+    /**
+     *
+     * @param pos Position in the input stream to seek.
+     * @param buf Buffer into which data is read.
+     * @param off Offset in the buffer from which stream data should be written.
+     * @param len The number of bytes to read.
+     * @return Total number of bytes read into the buffer, or -1 if there is no more data (EOF).
+     * @throws IOException In case of IO exception.
+     */
+    @Override public abstract int read(long pos, byte[] buf, int off, int len) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidHdfsVersionException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidHdfsVersionException.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidHdfsVersionException.java
new file mode 100644
index 0000000..e3fe3e2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidHdfsVersionException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+/**
+ * Exception thrown when GridGain detects that remote HDFS version differs from version of HDFS libraries
+ * in GridGain classpath.
+ */
+public class IgniteFsInvalidHdfsVersionException extends IgniteFsException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param msg Error message.
+     */
+    public IgniteFsInvalidHdfsVersionException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * @param msg Error message.
+     * @param cause Error cause.
+     */
+    public IgniteFsInvalidHdfsVersionException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidPathException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidPathException.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidPathException.java
new file mode 100644
index 0000000..842c6f4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsInvalidPathException.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * {@code GGFS} exception indicating that operation target is invalid
+ * (e.g. not a file while expecting to be a file).
+ */
+public class IgniteFsInvalidPathException extends IgniteFsException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Creates exception with given error message.
+     *
+     * @param msg Error message.
+     */
+    public IgniteFsInvalidPathException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Creates exception with given exception cause.
+     *
+     * @param cause Exception cause.
+     */
+    public IgniteFsInvalidPathException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Creates exception with given error message and exception cause.
+     *
+     * @param msg Error message.
+     * @param cause Error cause.
+     */
+    public IgniteFsInvalidPathException(String msg, @Nullable Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMetrics.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMetrics.java
new file mode 100644
index 0000000..b45260a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMetrics.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+/**
+ * {@code GGFS} metrics snapshot for the file system. Note, that some metrics are global and
+ * some are local (i.e. per each node).
+ */
+public interface IgniteFsMetrics {
+    /**
+     * Gets local used space in bytes. This is the sum of all file chunks stored on local node.
+     * <p>
+     * This is a local metric.
+     *
+     * @return Node used space in bytes.
+     */
+    public long localSpaceSize();
+
+    /**
+     * Gets maximum amount of data that can be stored on local node. This metrics is either
+     * equal to {@link org.apache.ignite.configuration.IgniteFsConfiguration#getMaxSpaceSize()}, or, if it is {@code 0}, equal to
+     * {@code 80%} of maximum heap size allocated for JVM.
+     *
+     * @return Maximum GGFS local space size.
+     */
+    public long maxSpaceSize();
+
+    /**
+    * Get used space in bytes used in the secondary file system.
+    * <p>
+    * This is a global metric.
+    *
+    * @return Used space in the secondary file system or {@code 0} in case no secondary file system is configured.
+    */
+    public long secondarySpaceSize();
+
+    /**
+     * Gets number of directories created in file system.
+     * <p>
+     * This is a global metric.
+     *
+     * @return Number of directories.
+     */
+    public int directoriesCount();
+
+    /**
+     * Gets number of files stored in file system.
+     * <p>
+     * This is a global metric.
+     *
+     * @return Number of files.
+     */
+    public int filesCount();
+
+    /**
+     * Gets number of files that are currently opened for reading.
+     * <p>
+     * This is a local metric.
+     *
+     * @return Number of opened files.
+     */
+    public int filesOpenedForRead();
+
+    /**
+     * Gets number of files that are currently opened for writing.
+     * <p>
+     * This is a local metric.
+     *
+     * @return Number of opened files.
+     */
+    public int filesOpenedForWrite();
+
+    /**
+     * Gets total blocks read, local and remote.
+     * <p>
+     * This is a local metric.
+     *
+     * @return Total blocks read.
+     */
+    public long blocksReadTotal();
+
+    /**
+     * Gets total remote blocks read.
+     * <p>
+     * This is a local metric.
+     *
+     * @return Total blocks remote read.
+     */
+    public long blocksReadRemote();
+
+    /**
+     * Gets total blocks written, local and remote.
+     * <p>
+     * This is a local metric.
+     *
+     * @return Total blocks written.
+     */
+    public long blocksWrittenTotal();
+
+    /**
+     * Gets total remote blocks written.
+     * <p>
+     * This is a local metric.
+     *
+     * @return Total blocks written.
+     */
+    public long blocksWrittenRemote();
+
+    /**
+     * Gets total bytes read.
+     * <p>
+     * This is a local metric.
+     *
+     * @return Total bytes read.
+     */
+    public long bytesRead();
+
+    /**
+     * Gets total bytes read time.
+     * <p>
+     * This is a local metric.
+     *
+     * @return Total bytes read time.
+     */
+    public long bytesReadTime();
+
+    /**
+     * Gets total bytes written.
+     * <p>
+     * This is a local metric.
+     *
+     * @return Total bytes written.
+     */
+    public long bytesWritten();
+
+    /**
+     * Gets total bytes write time.
+     * <p>
+     * This is a local metric.
+     *
+     * @return Total bytes write time.
+     */
+    public long bytesWriteTime();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMode.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMode.java
new file mode 100644
index 0000000..d847387
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsMode.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * {@code GGFS} mode defining interactions with underlying secondary Hadoop file system.
+ * Secondary Hadoop file system is provided for pass-through, write-through, and
+ * read-through purposes.
+ * <p>
+ * This mode is configured via {@link org.apache.ignite.configuration.IgniteFsConfiguration#getDefaultMode()}
+ * configuration property.
+ */
+public enum IgniteFsMode {
+    /**
+     * In this mode GGFS will not delegate to secondary Hadoop file system and will
+     * cache all the files in memory only.
+     */
+    PRIMARY,
+
+    /**
+     * In this mode GGFS will not cache any files in memory and will only pass them
+     * through to secondary Hadoop file system. If this mode is enabled, then
+     * secondary Hadoop file system must be configured.
+     *
+     * @see org.apache.ignite.configuration.IgniteFsConfiguration#getSecondaryHadoopFileSystemUri()
+     */
+    PROXY,
+
+    /**
+     * In this mode {@code GGFS} will cache files locally and also <i>synchronously</i>
+     * write them through to secondary Hadoop file system.
+     * <p>
+     * If secondary Hadoop file system is not configured, then this mode behaves like
+     * {@link #PRIMARY} mode.
+     *
+     * @see org.apache.ignite.configuration.IgniteFsConfiguration#getSecondaryHadoopFileSystemUri()
+     */
+    DUAL_SYNC,
+
+    /**
+     * In this mode {@code GGFS} will cache files locally and also <i>asynchronously</i>
+     * write them through to secondary Hadoop file system.
+     * <p>
+     * If secondary Hadoop file system is not configured, then this mode behaves like
+     * {@link #PRIMARY} mode.
+     *
+     * @see org.apache.ignite.configuration.IgniteFsConfiguration#getSecondaryHadoopFileSystemUri()
+     */
+    DUAL_ASYNC;
+
+    /** Enumerated values. */
+    private static final IgniteFsMode[] VALS = values();
+
+    /**
+     * Efficiently gets enumerated value from its ordinal.
+     *
+     * @param ord Ordinal value.
+     * @return Enumerated value or {@code null} if ordinal out of range.
+     */
+    @Nullable public static IgniteFsMode fromOrdinal(int ord) {
+        return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutOfSpaceException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutOfSpaceException.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutOfSpaceException.java
new file mode 100644
index 0000000..6fc354e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutOfSpaceException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * {@code GGFS} exception that is thrown when it detected out-of-space condition.
+ * It is thrown when number of writes written to a {@code GGFS} data nodes exceeds
+ * its maximum value (that is configured per-node).
+ */
+public class IgniteFsOutOfSpaceException extends IgniteFsException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Creates exception with given error message.
+     *
+     * @param msg Error message.
+     */
+    public IgniteFsOutOfSpaceException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Creates an instance of exception with given exception cause.
+     *
+     * @param cause Exception cause.
+     */
+    public IgniteFsOutOfSpaceException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Creates an instance of GGFS exception with given error message and given exception cause.
+     *
+     * @param msg Error message.
+     * @param cause Exception cause.
+     */
+    public IgniteFsOutOfSpaceException(String msg, @Nullable Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutputStream.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutputStream.java
new file mode 100644
index 0000000..9cb8181
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsOutputStream.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import java.io.*;
+
+/**
+ * {@code GGFS} output stream to write data into the file system.
+ */
+public abstract class IgniteFsOutputStream extends OutputStream {
+    /**
+     * Transfers specified amount of bytes from data input to this output stream.
+     * This method is optimized to avoid unnecessary temporal buffer creation and byte array copy.
+     *
+     * @param in Data input to copy bytes from.
+     * @param len Data length to copy.
+     * @throws IOException If write failed, read from input failed or there is no enough data in data input.
+     */
+    public abstract void transferFrom(DataInput in, int len) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsParentNotDirectoryException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsParentNotDirectoryException.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsParentNotDirectoryException.java
new file mode 100644
index 0000000..04a1383
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsParentNotDirectoryException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * Exception thrown when parent supposed to be a directory is a file.
+ */
+public class IgniteFsParentNotDirectoryException extends IgniteFsInvalidPathException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param msg Error message.
+     */
+    public IgniteFsParentNotDirectoryException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * @param cause Exception cause.
+     */
+    public IgniteFsParentNotDirectoryException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * @param msg Error message.
+     * @param cause Exception cause.
+     */
+    public IgniteFsParentNotDirectoryException(String msg, @Nullable Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPath.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPath.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPath.java
new file mode 100644
index 0000000..f6e1d14
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPath.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * {@code GGFS} path to file in the file system. For example, to get information about
+ * a file you would use the following code:
+ * <pre name="code" class="java">
+ *     GridGgfsPath dirPath = new GridGgfsPath("/my/working/dir");
+ *     GridGgfsPath filePath = new GridGgfsPath(dirPath, "file.txt");
+ *
+ *     // Get metadata about file.
+ *     GridGgfsFile file = ggfs.info(filePath);
+ * </pre>
+ */
+public final class IgniteFsPath implements Comparable<IgniteFsPath>, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** The directory separator character. */
+    private static final char SLASH_CHAR = '/';
+
+    /** The directory separator. */
+    private static final String SLASH = "/";
+
+    /** URI representing this path. Should never change after object creation or de-serialization. */
+    private String path;
+
+    /**
+     * Constructs default root path.
+     */
+    public IgniteFsPath() {
+        path = SLASH;
+    }
+
+    /**
+     * Constructs a path from an URI
+     *
+     * @param uri URI to create path from.
+     */
+    public IgniteFsPath(URI uri) {
+        A.notNull(uri, "uri");
+
+        path = normalizePath(uri.getPath());
+    }
+
+    /**
+     * Constructs a path from the URI string.
+     *
+     * @param path URI string.
+     */
+    public IgniteFsPath(String path) {
+        A.ensure(!F.isEmpty(path), "'path' is null or empty");
+
+        this.path = normalizePath(path);
+    }
+
+    /**
+     * Resolve a child path against a parent path.
+     *
+     * @param parentPath Parent path.
+     * @param childPath Child path.
+     */
+    public IgniteFsPath(IgniteFsPath parentPath, String childPath) {
+        A.notNull(parentPath, "parentPath");
+
+        String path = GridFilenameUtils.concat(parentPath.path, childPath);
+
+        if (F.isEmpty(path))
+            throw new IllegalArgumentException("Failed to parse path" +
+                " [parent=" + parentPath + ", childPath=" + childPath + ']');
+
+        this.path = normalizePath(path);
+    }
+
+    /**
+     * Initialize path with (1) not-null, (2) normalized, (3) absolute and (4) unix-format path component.
+     *
+     * @param path Path.
+     * @return Normalized path.
+     */
+    private static String normalizePath(String path) {
+        assert path != null;
+
+        String normalizedPath = GridFilenameUtils.normalizeNoEndSeparator(path, true);
+
+        if (F.isEmpty(normalizedPath))
+            throw new IllegalArgumentException("Failed to normalize path: " + path);
+
+        if (!SLASH.equals(GridFilenameUtils.getPrefix(normalizedPath)))
+            throw new IllegalArgumentException("Path should be absolute: " + path);
+
+        assert !normalizedPath.isEmpty() : "Expects normalized path is not empty.";
+        assert normalizedPath.length() == 1 || !normalizedPath.endsWith(SLASH) :
+            "Expects normalized path is root or don't ends with '/' symbol.";
+
+        return normalizedPath;
+    }
+
+    /**
+     * Returns the final component of this path.
+     *
+     * @return The final component of this path.
+     */
+    public String name() {
+        return GridFilenameUtils.getName(path);
+    }
+
+    /**
+     * Returns a root for this path.
+     *
+     * @return Root for this path.
+     */
+    public IgniteFsPath root() {
+        return new IgniteFsPath();
+    }
+
+    /**
+     * Split full path on components.
+     *
+     * @return Path components.
+     */
+    public List<String> components() {
+        String path = this.path;
+
+        assert path.length() >= 1 : "Path expected to be absolute: " + path;
+
+        // Path is short-living object, so we don't need to cache component's resolution result.
+        return path.length() == 1 ? Collections.<String>emptyList() : Arrays.asList(path.substring(1).split(SLASH));
+    }
+
+    /**
+     * Returns the parent of a path or {@code null} if at root.
+     *
+     * @return The parent of a path or {@code null} if at root.
+     */
+    @Nullable public IgniteFsPath parent() {
+        String path = this.path;
+
+        if (path.length() == 1)
+            return null; // Current path is root.
+
+        path = GridFilenameUtils.getFullPathNoEndSeparator(path);
+
+        return new IgniteFsPath(path);
+    }
+
+    /**
+     * Adds a suffix to the final name in the path.
+     *
+     * @param suffix Suffix.
+     * @return Path with suffix.
+     */
+    public IgniteFsPath suffix(String suffix) {
+        A.ensure(!F.isEmpty(suffix), "'suffix' is null or empty.");
+        A.ensure(!suffix.contains(SLASH), "'suffix' contains file's separator '" + SLASH + "'");
+
+        return new IgniteFsPath(path + suffix);
+    }
+
+    /**
+     * Return the number of elements in this path.
+     *
+     * @return The number of elements in this path, zero depth means root directory.
+     */
+    public int depth() {
+        final String path = this.path;
+        final int size = path.length();
+
+        assert size >= 1 && path.charAt(0) == SLASH_CHAR : "Expects absolute path: " + path;
+
+        if (size == 1)
+            return 0;
+
+        int depth = 1;
+
+        // Ignore the first character.
+        for (int i = 1; i < size; i++)
+            if (path.charAt(i) == SLASH_CHAR)
+                depth++;
+
+        return depth;
+    }
+
+    /**
+     * Checks whether this path is a sub-directory of argument.
+     *
+     * @param path Path to check.
+     * @return {@code True} if argument is same or a sub-directory of this object.
+     */
+    public boolean isSubDirectoryOf(IgniteFsPath path) {
+        A.notNull(path, "path");
+
+        return this.path.startsWith(path.path.endsWith(SLASH) ? path.path : path.path + SLASH);
+    }
+
+    /**
+     * Checks if paths are identical.
+     *
+     * @param path Path to check.
+     * @return {@code True} if paths are identical.
+     */
+    public boolean isSame(IgniteFsPath path) {
+        A.notNull(path, "path");
+
+        return this == path || this.path.equals(path.path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(IgniteFsPath o) {
+        return path.compareTo(o.path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException {
+        path = U.readString(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return path.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        return o == this || o != null && getClass() == o.getClass() && path.equals(((IgniteFsPath)o).path);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return path;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathAlreadyExistsException.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathAlreadyExistsException.java
new file mode 100644
index 0000000..17fa71a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathAlreadyExistsException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * Exception thrown when target path supposed to be created already exists.
+ */
+public class IgniteFsPathAlreadyExistsException extends IgniteFsInvalidPathException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param msg Error message.
+     */
+    public IgniteFsPathAlreadyExistsException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * @param cause Exception cause.
+     */
+    public IgniteFsPathAlreadyExistsException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * @param msg Error message.
+     * @param cause Exception cause.
+     */
+    public IgniteFsPathAlreadyExistsException(String msg, @Nullable Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathSummary.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathSummary.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathSummary.java
new file mode 100644
index 0000000..a03e704
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsPathSummary.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Path summary: total files count, total directories count, total length.
+ */
+public class IgniteFsPathSummary implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Path. */
+    private IgniteFsPath path;
+
+    /** File count. */
+    private int filesCnt;
+
+    /** Directories count. */
+    private int dirCnt;
+
+    /** Length consumed. */
+    private long totalLen;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public IgniteFsPathSummary() {
+        // No-op.
+    }
+
+    /**
+     * Construct empty path summary.
+     *
+     * @param path Path.
+     */
+    public IgniteFsPathSummary(IgniteFsPath path) {
+        this.path = path;
+    }
+
+    /**
+     * @return Files count.
+     */
+    public int filesCount() {
+        return filesCnt;
+    }
+
+    /**
+     * @param filesCnt Files count.
+     */
+    public void filesCount(int filesCnt) {
+        this.filesCnt = filesCnt;
+    }
+
+    /**
+     * @return Directories count.
+     */
+    public int directoriesCount() {
+        return dirCnt;
+    }
+
+    /**
+     * @param dirCnt Directories count.
+     */
+    public void directoriesCount(int dirCnt) {
+        this.dirCnt = dirCnt;
+    }
+
+    /**
+     * @return Total length.
+     */
+    public long totalLength() {
+        return totalLen;
+    }
+
+    /**
+     * @param totalLen Total length.
+     */
+    public void totalLength(long totalLen) {
+        this.totalLen = totalLen;
+    }
+
+    /**
+     * @return Path for which summary is obtained.
+     */
+    public IgniteFsPath path() {
+        return path;
+    }
+
+    /**
+     * @param path Path for which summary is obtained.
+     */
+    public void path(IgniteFsPath path) {
+        this.path = path;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(filesCnt);
+        out.writeInt(dirCnt);
+        out.writeLong(totalLen);
+
+        path.writeExternal(out);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        filesCnt = in.readInt();
+        dirCnt = in.readInt();
+        totalLen = in.readLong();
+
+        path = new IgniteFsPath();
+        path.readExternal(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteFsPathSummary.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsReader.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsReader.java
new file mode 100644
index 0000000..6e16792
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/IgniteFsReader.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import java.io.*;
+
+/**
+ * The simplest data input interface to read from secondary file system in dual modes.
+ */
+public interface IgniteFsReader extends Closeable {
+    /**
+     * Read up to the specified number of bytes, from a given position within a file, and return the number of bytes
+     * read.
+     *
+     * @param pos Position in the input stream to seek.
+     * @param buf Buffer into which data is read.
+     * @param off Offset in the buffer from which stream data should be written.
+     * @param len The number of bytes to read.
+     * @return Total number of bytes read into the buffer, or -1 if there is no more data (EOF).
+     * @throws IOException In case of any exception.
+     */
+    public int read(long pos, byte[] buf, int off, int len) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsFileRange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsFileRange.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsFileRange.java
new file mode 100644
index 0000000..21e2710
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsFileRange.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs.mapreduce;
+
+import org.apache.ignite.ignitefs.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+/**
+ * Entity representing part of GGFS file identified by file path, start position, and length.
+ */
+public class IgniteFsFileRange {
+    /** File path. */
+    private IgniteFsPath path;
+
+    /** Start position. */
+    private long start;
+
+    /** Length. */
+    private long len;
+
+    /**
+     * Creates file range.
+     *
+     * @param path File path.
+     * @param start Start position.
+     * @param len Length.
+     */
+    public IgniteFsFileRange(IgniteFsPath path, long start, long len) {
+        this.path = path;
+        this.start = start;
+        this.len = len;
+    }
+
+    /**
+     * Gets file path.
+     *
+     * @return File path.
+     */
+    public IgniteFsPath path() {
+        return path;
+    }
+
+    /**
+     * Gets range start position.
+     *
+     * @return Start position.
+     */
+    public long start() {
+        return start;
+    }
+
+    /**
+     * Gets range length.
+     *
+     * @return Length.
+     */
+    public long length() {
+        return len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteFsFileRange.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsInputStreamJobAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsInputStreamJobAdapter.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsInputStreamJobAdapter.java
new file mode 100644
index 0000000..75cf6d2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsInputStreamJobAdapter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs.mapreduce;
+
+import org.apache.ignite.*;
+import org.apache.ignite.ignitefs.*;
+import org.apache.ignite.internal.util.*;
+
+import java.io.*;
+
+/**
+ * Convenient {@link IgniteFsJob} adapter. It limits data returned from {@link org.apache.ignite.ignitefs.IgniteFsInputStream} to bytes within
+ * the {@link IgniteFsFileRange} assigned to the job.
+ * <p>
+ * Under the covers it simply puts job's {@code GridGgfsInputStream} position to range start and wraps in into
+ * {@link GridFixedSizeInputStream} limited to range length.
+ */
+public abstract class IgniteFsInputStreamJobAdapter extends IgniteFsJobAdapter {
+    /** {@inheritDoc} */
+    @Override public final Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in)
+        throws IgniteException, IOException {
+        in.seek(range.start());
+
+        return execute(ggfs, new IgniteFsRangeInputStream(in, range));
+    }
+
+    /**
+     * Executes this job.
+     *
+     * @param ggfs GGFS instance.
+     * @param in Input stream.
+     * @return Execution result.
+     * @throws IgniteException If execution failed.
+     * @throws IOException If IO exception encountered while working with stream.
+     */
+    public abstract Object execute(IgniteFs ggfs, IgniteFsRangeInputStream in) throws IgniteException, IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJob.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJob.java
new file mode 100644
index 0000000..e1b481d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJob.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs.mapreduce;
+
+import org.apache.ignite.*;
+import org.apache.ignite.ignitefs.*;
+
+import java.io.*;
+
+/**
+ * Defines executable unit for {@link IgniteFsTask}. Before this job is executed, it is assigned one of the
+ * ranges provided by the {@link IgniteFsRecordResolver} passed to one of the {@code GridGgfs.execute(...)} methods.
+ * <p>
+ * {@link #execute(org.apache.ignite.IgniteFs, IgniteFsFileRange, org.apache.ignite.ignitefs.IgniteFsInputStream)} method is given {@link IgniteFsFileRange} this
+ * job is expected to operate on, and already opened {@link org.apache.ignite.ignitefs.IgniteFsInputStream} for the file this range belongs to.
+ * <p>
+ * Note that provided input stream has position already adjusted to range start. However, it will not
+ * automatically stop on range end. This is done to provide capability in some cases to look beyond
+ * the range end or seek position before the reange start.
+ * <p>
+ * In majority of the cases, when you want to process only provided range, you should explicitly control amount
+ * of returned data and stop at range end. You can also use {@link IgniteFsInputStreamJobAdapter}, which operates
+ * on {@link IgniteFsRangeInputStream} bounded to range start and end, or manually wrap provided input stream with
+ * {@link IgniteFsRangeInputStream}.
+ * <p>
+ * You can inject any resources in concrete implementation, just as with regular {@link org.apache.ignite.compute.ComputeJob} implementations.
+ */
+public interface IgniteFsJob {
+    /**
+     * Executes this job.
+     *
+     * @param ggfs GGFS instance.
+     * @param range File range aligned to record boundaries.
+     * @param in Input stream for split file. This input stream is not aligned to range and points to file start
+     *     by default.
+     * @return Execution result.
+     * @throws IgniteException If execution failed.
+     * @throws IOException If file system operation resulted in IO exception.
+     */
+    public Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in) throws IgniteException,
+        IOException;
+
+    /**
+     * This method is called when system detects that completion of this
+     * job can no longer alter the overall outcome (for example, when parent task
+     * has already reduced the results). Job is also cancelled when
+     * {@link org.apache.ignite.compute.ComputeTaskFuture#cancel()} is called.
+     * <p>
+     * Note that job cancellation is only a hint, and just like with
+     * {@link Thread#interrupt()}  method, it is really up to the actual job
+     * instance to gracefully finish execution and exit.
+     */
+    public void cancel();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJobAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJobAdapter.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJobAdapter.java
new file mode 100644
index 0000000..ba907f2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsJobAdapter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs.mapreduce;
+
+/**
+ * Adapter for {@link IgniteFsJob} with no-op implementation of {@link #cancel()} method.
+ */
+public abstract class IgniteFsJobAdapter implements IgniteFsJob {
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRangeInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRangeInputStream.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRangeInputStream.java
new file mode 100644
index 0000000..b6df002
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRangeInputStream.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs.mapreduce;
+
+import org.apache.ignite.ignitefs.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Decorator for regular {@link org.apache.ignite.ignitefs.IgniteFsInputStream} which streams only data within the given range.
+ * This stream is used for {@link IgniteFsInputStreamJobAdapter} convenience adapter to create
+ * jobs which will be working only with the assigned range. You can also use it explicitly when
+ * working with {@link IgniteFsJob} directly.
+ */
+public final class IgniteFsRangeInputStream extends IgniteFsInputStream {
+    /** Base input stream. */
+    private final IgniteFsInputStream is;
+
+    /** Start position. */
+    private final long start;
+
+    /** Maximum stream length. */
+    private final long maxLen;
+
+    /** Current position within the stream. */
+    private long pos;
+
+    /**
+     * Constructor.
+     *
+     * @param is Base input stream.
+     * @param start Start position.
+     * @param maxLen Maximum stream length.
+     * @throws IOException In case of exception.
+     */
+    public IgniteFsRangeInputStream(IgniteFsInputStream is, long start, long maxLen) throws IOException {
+        if (is == null)
+            throw new IllegalArgumentException("Input stream cannot be null.");
+
+        if (start < 0)
+            throw new IllegalArgumentException("Start position cannot be negative.");
+
+        if (start >= is.length())
+            throw new IllegalArgumentException("Start position cannot be greater that file length.");
+
+        if (maxLen < 0)
+            throw new IllegalArgumentException("Length cannot be negative.");
+
+        if (start + maxLen > is.length())
+            throw new IllegalArgumentException("Sum of start position and length cannot be greater than file length.");
+
+        this.is = is;
+        this.start = start;
+        this.maxLen = maxLen;
+
+        is.seek(start);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long length() {
+        return is.length();
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param is Base input stream.
+     * @param range File range.
+     * @throws IOException In case of exception.
+     */
+    public IgniteFsRangeInputStream(IgniteFsInputStream is, IgniteFsFileRange range) throws IOException {
+        this(is, range.start(), range.length());
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read() throws IOException {
+        if (pos < maxLen) {
+            int res = is.read();
+
+            if (res != -1)
+                pos++;
+
+            return res;
+        }
+        else
+            return -1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(@NotNull byte[] b, int off, int len) throws IOException {
+        if (pos < maxLen) {
+            len = (int)Math.min(len, maxLen - pos);
+
+            int res = is.read(b, off, len);
+
+            if (res != -1)
+                pos += res;
+
+            return res;
+        }
+        else
+            return -1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(long pos, byte[] buf, int off, int len) throws IOException {
+        seek(pos);
+
+        return read(buf, off, len);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFully(long pos, byte[] buf) throws IOException {
+        readFully(pos, buf, 0, buf.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
+        seek(pos);
+
+        for (int readBytes = 0; readBytes < len;) {
+            int read = read(buf, off + readBytes, len - readBytes);
+
+            if (read == -1)
+                throw new EOFException("Failed to read stream fully (stream ends unexpectedly) [pos=" + pos +
+                    ", buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']');
+
+            readBytes += read;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void seek(long pos) throws IOException {
+        if (pos < 0)
+            throw new IOException("Seek position cannot be negative: " + pos);
+
+        is.seek(start + pos);
+
+        this.pos = pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long position() {
+        return pos;
+    }
+
+    /**
+     * Since range input stream represents a part of larger file stream, there is an offset at which this
+     * range input stream starts in original input stream. This method returns start offset of this input
+     * stream relative to original input stream.
+     *
+     * @return Start offset in original input stream.
+     */
+    public long startOffset() {
+        return start;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int available() {
+        long l = maxLen - pos;
+
+        if (l < 0)
+            return 0;
+
+        if (l > Integer.MAX_VALUE)
+            return Integer.MAX_VALUE;
+
+        return (int)l;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IOException {
+        is.close();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteFsRangeInputStream.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRecordResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRecordResolver.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRecordResolver.java
new file mode 100644
index 0000000..3efd411
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsRecordResolver.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs.mapreduce;
+
+import org.apache.ignite.*;
+import org.apache.ignite.ignitefs.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * GGFS record resolver. When {@link IgniteFsTask} is split into {@link IgniteFsJob}s each produced job will obtain
+ * {@link IgniteFsFileRange} based on file data location. Record resolver is invoked in each job before actual
+ * execution in order to adjust record boundaries in a way consistent with user data.
+ * <p>
+ * E.g., you may want to split your task into jobs so that each job process zero, one or several lines from that file.
+ * But file is split into ranges based on block locations, not new line boundaries. Using convenient record resolver
+ * you can adjust job range so that it covers the whole line(s).
+ * <p>
+ * The following record resolvers are available out of the box:
+ * <ul>
+ *     <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgniteFsFixedLengthRecordResolver}</li>
+ *     <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgniteFsByteDelimiterRecordResolver}</li>
+ *     <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgniteFsStringDelimiterRecordResolver}</li>
+ *     <li>{@link org.apache.ignite.ignitefs.mapreduce.records.IgniteFsNewLineRecordResolver}</li>
+ * </ul>
+ */
+public interface IgniteFsRecordResolver extends Serializable {
+    /**
+     * Adjusts record start offset and length.
+     *
+     * @param fs IgniteFs instance to use.
+     * @param stream Input stream for split file.
+     * @param suggestedRecord Suggested file system record.
+     * @return New adjusted record. If this method returns {@code null}, original record is ignored.
+     * @throws IgniteException If resolve failed.
+     * @throws IOException If resolve failed.
+     */
+    @Nullable public IgniteFsFileRange resolveRecords(IgniteFs fs, IgniteFsInputStream stream,
+        IgniteFsFileRange suggestedRecord) throws IgniteException, IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTask.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTask.java
new file mode 100644
index 0000000..88463e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTask.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs.mapreduce;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.ignitefs.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.fs.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * GGFS task which can be executed on the grid using one of {@code GridGgfs.execute()} methods. Essentially GGFS task
+ * is regular {@link org.apache.ignite.compute.ComputeTask} with different map logic. Instead of implementing
+ * {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method to split task into jobs, you must implement
+ * {@link IgniteFsTask#createJob(org.apache.ignite.ignitefs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method.
+ * <p>
+ * Each file participating in GGFS task is split into {@link IgniteFsFileRange}s first. Normally range is a number of
+ * consequent bytes located on a single node (see {@code IgniteFsGroupDataBlocksKeyMapper}). In case maximum range size
+ * is provided (either through {@link org.apache.ignite.configuration.IgniteFsConfiguration#getMaximumTaskRangeLength()} or {@code GridGgfs.execute()}
+ * argument), then ranges could be further divided into smaller chunks.
+ * <p>
+ * Once file is split into ranges, each range is passed to {@code GridGgfsTask.createJob()} method in order to create a
+ * {@link IgniteFsJob}.
+ * <p>
+ * Finally all generated jobs are sent to Grid nodes for execution.
+ * <p>
+ * As with regular {@code GridComputeTask} you can define your own logic for results handling and reduce step.
+ * <p>
+ * Here is an example of such a task:
+ * <pre name="code" class="java">
+ * public class WordCountTask extends GridGgfsTask&lt;String, Integer&gt; {
+ *     &#64;Override
+ *     public GridGgfsJob createJob(GridGgfsPath path, GridGgfsFileRange range, GridGgfsTaskArgs&lt;T&gt; args) throws IgniteCheckedException {
+ *         // New job will be created for each range within each file.
+ *         // We pass user-provided argument (which is essentially a word to look for) to that job.
+ *         return new WordCountJob(args.userArgument());
+ *     }
+ *
+ *     // Aggregate results into one compound result.
+ *     public Integer reduce(List&lt;GridComputeJobResult&gt; results) throws IgniteCheckedException {
+ *         Integer total = 0;
+ *
+ *         for (GridComputeJobResult res : results) {
+ *             Integer cnt = res.getData();
+ *
+ *             // Null can be returned for non-existent file in case we decide to ignore such situations.
+ *             if (cnt != null)
+ *                 total += cnt;
+ *         }
+ *
+ *         return total;
+ *     }
+ * }
+ * </pre>
+ */
+public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTaskArgs<T>, R> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Injected grid. */
+    @IgniteInstanceResource
+    private Ignite ignite;
+
+    /** {@inheritDoc} */
+    @Nullable @Override public final Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable IgniteFsTaskArgs<T> args) {
+        assert ignite != null;
+        assert args != null;
+
+        IgniteFs fs = ignite.fileSystem(args.ggfsName());
+        IgniteFsProcessorAdapter ggfsProc = ((IgniteKernal) ignite).context().ggfs();
+
+        Map<ComputeJob, ClusterNode> splitMap = new HashMap<>();
+
+        Map<UUID, ClusterNode> nodes = mapSubgrid(subgrid);
+
+        for (IgniteFsPath path : args.paths()) {
+            IgniteFsFile file = fs.info(path);
+
+            if (file == null) {
+                if (args.skipNonExistentFiles())
+                    continue;
+                else
+                    throw new IgniteException("Failed to process IgniteFs file because it doesn't exist: " + path);
+            }
+
+            Collection<IgniteFsBlockLocation> aff = fs.affinity(path, 0, file.length(), args.maxRangeLength());
+
+            long totalLen = 0;
+
+            for (IgniteFsBlockLocation loc : aff) {
+                ClusterNode node = null;
+
+                for (UUID nodeId : loc.nodeIds()) {
+                    node = nodes.get(nodeId);
+
+                    if (node != null)
+                        break;
+                }
+
+                if (node == null)
+                    throw new IgniteException("Failed to find any of block affinity nodes in subgrid [loc=" + loc +
+                        ", subgrid=" + subgrid + ']');
+
+                IgniteFsJob job = createJob(path, new IgniteFsFileRange(file.path(), loc.start(), loc.length()), args);
+
+                if (job != null) {
+                    ComputeJob jobImpl = ggfsProc.createJob(job, fs.name(), file.path(), loc.start(),
+                        loc.length(), args.recordResolver());
+
+                    splitMap.put(jobImpl, node);
+                }
+
+                totalLen += loc.length();
+            }
+
+            assert totalLen == file.length();
+        }
+
+        return splitMap;
+    }
+
+    /**
+     * Callback invoked during task map procedure to create job that will process specified split
+     * for GGFS file.
+     *
+     * @param path Path.
+     * @param range File range based on consecutive blocks. This range will be further
+     *      realigned to record boundaries on destination node.
+     * @param args Task argument.
+     * @return GGFS job. If {@code null} is returned, the passed in file range will be skipped.
+     * @throws IgniteException If job creation failed.
+     */
+    @Nullable public abstract IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range,
+        IgniteFsTaskArgs<T> args) throws IgniteException;
+
+    /**
+     * Maps list by node ID.
+     *
+     * @param subgrid Subgrid.
+     * @return Map.
+     */
+    private Map<UUID, ClusterNode> mapSubgrid(Collection<ClusterNode> subgrid) {
+        Map<UUID, ClusterNode> res = U.newHashMap(subgrid.size());
+
+        for (ClusterNode node : subgrid)
+            res.put(node.id(), node);
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTaskArgs.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTaskArgs.java b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTaskArgs.java
new file mode 100644
index 0000000..4be5001
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/ignitefs/mapreduce/IgniteFsTaskArgs.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs.mapreduce;
+
+import org.apache.ignite.ignitefs.*;
+
+import java.util.*;
+
+/**
+ * GGFS task arguments. When you initiate new GGFS task execution using one of {@code GridGgfs.execute(...)} methods,
+ * all passed parameters are encapsulated in a single {@code GridGgfsTaskArgs} object. Later on this object is
+ * passed to {@link IgniteFsTask#createJob(org.apache.ignite.ignitefs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method.
+ * <p>
+ * Task arguments encapsulates the following data:
+ * <ul>
+ *     <li>GGFS name</li>
+ *     <li>File paths passed to {@code GridGgfs.execute()} method</li>
+ *     <li>{@link IgniteFsRecordResolver} for that task</li>
+ *     <li>Flag indicating whether to skip non-existent file paths or throw an exception</li>
+ *     <li>User-defined task argument</li>
+ *     <li>Maximum file range length for that task (see {@link org.apache.ignite.configuration.IgniteFsConfiguration#getMaximumTaskRangeLength()})</li>
+ * </ul>
+ */
+public interface IgniteFsTaskArgs<T> {
+    /**
+     * Gets GGFS name.
+     *
+     * @return GGFS name.
+     */
+    public String ggfsName();
+
+    /**
+     * Gets file paths to process.
+     *
+     * @return File paths to process.
+     */
+    public Collection<IgniteFsPath> paths();
+
+    /**
+     * Gets record resolver for the task.
+     *
+     * @return Record resolver.
+     */
+    public IgniteFsRecordResolver recordResolver();
+
+    /**
+     * Flag indicating whether to fail or simply skip non-existent files.
+     *
+     * @return {@code True} if non-existent files should be skipped.
+     */
+    public boolean skipNonExistentFiles();
+
+    /**
+     * User argument provided for task execution.
+     *
+     * @return User argument.
+     */
+    public T userArgument();
+
+    /**
+     * Optional maximum allowed range length, {@code 0} by default. If not specified, full range including
+     * all consecutive blocks will be used without any limitations.
+     *
+     * @return Maximum range length.
+     */
+    public long maxRangeLength();
+}


Mime
View raw message