ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [10/52] [abbrv] incubator-ignite git commit: # IGNITE-226: WIP.
Date Fri, 13 Feb 2015 13:14:13 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
new file mode 100644
index 0000000..c71a644
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
@@ -0,0 +1,875 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
+import org.jdk8.backport.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CachePreloadMode.*;
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.*;
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+
+/**
+ * {@link IgfsAttributes} test case.
+ */
+public class IgfsSizeSelfTest extends IgfsCommonAbstractTest {
+    /** How many grids to start. */
+    private static final int GRID_CNT = 3;
+
+    /** How many files to save. */
+    private static final int FILES_CNT = 10;
+
+    /** Maximum amount of bytes that could be written to particular file. */
+    private static final int MAX_FILE_SIZE = 1024 * 10;
+
+    /** Block size. */
+    private static final int BLOCK_SIZE = 384;
+
+    /** Cache name. */
+    private static final String DATA_CACHE_NAME = "dataCache";
+
+    /** Cache name. */
+    private static final String META_CACHE_NAME = "metaCache";
+
+    /** GGFS name. */
+    private static final String GGFS_NAME = "ggfs";
+
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** GGFS management port */
+    private static int mgmtPort;
+
+    /** Data cache mode. */
+    private CacheMode cacheMode;
+
+    /** Whether near cache is enabled (applicable for PARTITIONED cache only). */
+    private boolean nearEnabled;
+
+    /** GGFS maximum space. */
+    private long ggfsMaxData;
+
+    /** Trash purge timeout. */
+    private long trashPurgeTimeout;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cacheMode = null;
+        nearEnabled = false;
+        ggfsMaxData = 0;
+        trashPurgeTimeout = 0;
+
+        mgmtPort = 11400;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        G.stopAll(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        IgfsConfiguration ggfsCfg = new IgfsConfiguration();
+
+        ggfsCfg.setDataCacheName(DATA_CACHE_NAME);
+        ggfsCfg.setMetaCacheName(META_CACHE_NAME);
+        ggfsCfg.setName(GGFS_NAME);
+        ggfsCfg.setBlockSize(BLOCK_SIZE);
+        ggfsCfg.setFragmentizerEnabled(false);
+        ggfsCfg.setMaxSpaceSize(ggfsMaxData);
+        ggfsCfg.setTrashPurgeTimeout(trashPurgeTimeout);
+        ggfsCfg.setManagementPort(++mgmtPort);
+
+        CacheConfiguration dataCfg = defaultCacheConfiguration();
+
+        dataCfg.setName(DATA_CACHE_NAME);
+        dataCfg.setCacheMode(cacheMode);
+
+        if (cacheMode == PARTITIONED) {
+            dataCfg.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY);
+            dataCfg.setBackups(0);
+        }
+
+        dataCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        dataCfg.setPreloadMode(SYNC);
+        dataCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128));
+        dataCfg.setQueryIndexEnabled(false);
+        dataCfg.setAtomicityMode(TRANSACTIONAL);
+
+        CacheConfiguration metaCfg = defaultCacheConfiguration();
+
+        metaCfg.setName(META_CACHE_NAME);
+        metaCfg.setCacheMode(REPLICATED);
+
+        metaCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        metaCfg.setPreloadMode(SYNC);
+        metaCfg.setQueryIndexEnabled(false);
+        metaCfg.setAtomicityMode(TRANSACTIONAL);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+        cfg.setCacheConfiguration(metaCfg, dataCfg);
+        cfg.setGgfsConfiguration(ggfsCfg);
+
+        return cfg;
+    }
+
+    /**
+     * Perform initial startup.
+     *
+     * @throws Exception If failed.
+     */
+    private void startUp() throws Exception {
+        startGrids(GRID_CNT);
+    }
+
+    /**
+     * Ensure that PARTITIONED cache is correctly initialized.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPartitioned() throws Exception {
+        cacheMode = PARTITIONED;
+        nearEnabled = true;
+
+        check();
+    }
+
+    /**
+     * Ensure that co-located cache is correctly initialized.
+     *
+     * @throws Exception If failed.
+     */
+    public void testColocated() throws Exception {
+        cacheMode = PARTITIONED;
+        nearEnabled = false;
+
+        check();
+    }
+
+    /**
+     * Ensure that REPLICATED cache is correctly initialized.
+     *
+     * @throws Exception If failed.
+     */
+    public void testReplicated() throws Exception {
+        cacheMode = REPLICATED;
+
+        check();
+    }
+
+    /**
+     * Ensure that exception is thrown in case PARTITIONED cache is oversized.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPartitionedOversize() throws Exception {
+        cacheMode = PARTITIONED;
+        nearEnabled = true;
+
+        checkOversize();
+    }
+
+    /**
+     * Ensure that exception is thrown in case co-located cache is oversized.
+     *
+     * @throws Exception If failed.
+     */
+    public void testColocatedOversize() throws Exception {
+        cacheMode = PARTITIONED;
+        nearEnabled = false;
+
+        check();
+    }
+
+    /**
+     * Ensure that exception is thrown in case REPLICATED cache is oversized.
+     *
+     * @throws Exception If failed.
+     */
+    public void testReplicatedOversize() throws Exception {
+        cacheMode = REPLICATED;
+
+        check();
+    }
+
+    /**
+     * Ensure that exception is not thrown in case PARTITIONED cache is oversized, but data is deleted concurrently.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPartitionedOversizeDelay() throws Exception {
+        cacheMode = PARTITIONED;
+        nearEnabled = true;
+
+        checkOversizeDelay();
+    }
+
+    /**
+     * Ensure that exception is not thrown in case co-located cache is oversized, but data is deleted concurrently.
+     *
+     * @throws Exception If failed.
+     */
+    public void testColocatedOversizeDelay() throws Exception {
+        cacheMode = PARTITIONED;
+        nearEnabled = false;
+
+        checkOversizeDelay();
+    }
+
+    /**
+     * Ensure that exception is not thrown in case REPLICATED cache is oversized, but data is deleted concurrently.
+     *
+     * @throws Exception If failed.
+     */
+    public void testReplicatedOversizeDelay() throws Exception {
+        cacheMode = REPLICATED;
+
+        checkOversizeDelay();
+    }
+
+    /**
+     * Ensure that GGFS size is correctly updated in case of preloading for PARTITIONED cache.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPartitionedPreload() throws Exception {
+        cacheMode = PARTITIONED;
+        nearEnabled = true;
+
+        checkPreload();
+    }
+
+    /**
+     * Ensure that GGFS size is correctly updated in case of preloading for co-located cache.
+     *
+     * @throws Exception If failed.
+     */
+    public void testColocatedPreload() throws Exception {
+        cacheMode = PARTITIONED;
+        nearEnabled = false;
+
+        checkPreload();
+    }
+
+    /**
+     * Ensure that GGFS cache size is calculated correctly.
+     *
+     * @throws Exception If failed.
+     */
+    private void check() throws Exception {
+        startUp();
+
+        // Ensure that cache was marked as GGFS data cache.
+        for (int i = 0; i < GRID_CNT; i++) {
+            IgniteEx g = grid(i);
+
+            GridCacheProjectionEx cache = (GridCacheProjectionEx)g.cachex(DATA_CACHE_NAME).cache();
+
+            assert cache.isGgfsDataCache();
+        }
+
+        // Perform writes.
+        Collection<IgfsFile> files = write();
+
+        // Check sizes.
+        Map<UUID, Integer> expSizes = new HashMap<>(GRID_CNT, 1.0f);
+
+        for (IgfsFile file : files) {
+            for (IgfsBlock block : file.blocks()) {
+                Collection<UUID> ids = primaryOrBackups(block.key());
+
+                for (UUID id : ids) {
+                    if (expSizes.get(id) == null)
+                        expSizes.put(id, block.length());
+                    else
+                        expSizes.put(id, expSizes.get(id) + block.length());
+                }
+            }
+        }
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            UUID id = grid(i).localNode().id();
+
+            GridCacheAdapter<IgfsBlockKey, byte[]> cache = cache(id);
+
+            int expSize = expSizes.get(id) != null ? expSizes.get(id) : 0;
+
+            assert expSize == cache.ggfsDataSpaceUsed();
+        }
+
+        // Perform reads which could potentially be non-local.
+        byte[] buf = new byte[BLOCK_SIZE];
+
+        for (IgfsFile file : files) {
+            for (int i = 0; i < GRID_CNT; i++) {
+                int total = 0;
+
+                IgfsInputStream is = ggfs(i).open(file.path());
+
+                while (true) {
+                    int read = is.read(buf);
+
+                    if (read == -1)
+                        break;
+                    else
+                        total += read;
+                }
+
+                assert total == file.length() : "Not enough bytes read: [expected=" + file.length() + ", actual=" +
+                    total + ']';
+
+                is.close();
+            }
+        }
+
+        // Check sizes after read.
+        if (cacheMode == PARTITIONED) {
+            // No changes since the previous check for co-located cache.
+            for (int i = 0; i < GRID_CNT; i++) {
+                UUID id = grid(i).localNode().id();
+
+                GridCacheAdapter<IgfsBlockKey, byte[]> cache = cache(id);
+
+                int expSize = expSizes.get(id) != null ? expSizes.get(id) : 0;
+
+                assert expSize == cache.ggfsDataSpaceUsed();
+            }
+        }
+        else {
+            // All data must exist on each cache.
+            int totalSize = 0;
+
+            for (IgfsFile file : files)
+                totalSize += file.length();
+
+            for (int i = 0; i < GRID_CNT; i++) {
+                UUID id = grid(i).localNode().id();
+
+                GridCacheAdapter<IgfsBlockKey, byte[]> cache = cache(id);
+
+                assertEquals(totalSize, cache.ggfsDataSpaceUsed());
+            }
+        }
+
+        // Delete data and ensure that all counters are 0 now.
+        for (IgfsFile file : files) {
+            ggfs(0).delete(file.path(), false);
+
+            // Await for actual delete to occur.
+            for (IgfsBlock block : file.blocks()) {
+                for (int i = 0; i < GRID_CNT; i++) {
+                    while (cache(grid(i).localNode().id()).peek(block.key()) != null)
+                        U.sleep(100);
+                }
+            }
+        }
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            GridCacheAdapter<IgfsBlockKey, byte[]> cache = cache(grid(i).localNode().id());
+
+            assert 0 == cache.ggfsDataSpaceUsed() : "Size counter is not 0: " + cache.ggfsDataSpaceUsed();
+        }
+    }
+
+    /**
+     * Ensure that an exception is thrown in case of GGFS oversize.
+     *
+     * @throws Exception If failed.
+     */
+    private void checkOversize() throws Exception {
+        ggfsMaxData = BLOCK_SIZE;
+
+        startUp();
+
+        final IgfsPath path = new IgfsPath("/file");
+
+        // This write is expected to be successful.
+        IgfsOutputStream os = ggfs(0).create(path, false);
+        os.write(chunk(BLOCK_SIZE - 1));
+        os.close();
+
+        // This write must be successful as well.
+        os = ggfs(0).append(path, false);
+        os.write(chunk(1));
+        os.close();
+
+        // This write must fail w/ exception.
+        GridTestUtils.assertThrows(log(), new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                IgfsOutputStream osErr = ggfs(0).append(path, false);
+
+                try {
+                    osErr.write(chunk(BLOCK_SIZE));
+                    osErr.close();
+
+                    return null;
+                }
+                catch (IOException e) {
+                    Throwable e0 = e;
+
+                    while (e0.getCause() != null)
+                        e0 = e0.getCause();
+
+                    throw (Exception)e0;
+                }
+                finally {
+                    U.closeQuiet(osErr);
+                }
+            }
+        }, IgfsOutOfSpaceException.class, "Failed to write data block (GGFS maximum data size exceeded) [used=" +
+            ggfsMaxData + ", allowed=" + ggfsMaxData + ']');
+    }
+
+    /**
+     * Ensure that exception is not thrown or thrown with some delay when there is something in trash directory.
+     *
+     * @throws Exception If failed.
+     */
+    private void checkOversizeDelay() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        ggfsMaxData = 256;
+        trashPurgeTimeout = 2000;
+
+        startUp();
+
+        IgfsImpl ggfs = ggfs(0);
+
+        final IgfsPath path = new IgfsPath("/file");
+        final IgfsPath otherPath = new IgfsPath("/fileOther");
+
+        // Fill cache with data up to it's limit.
+        IgfsOutputStream os = ggfs.create(path, false);
+        os.write(chunk((int)ggfsMaxData));
+        os.close();
+
+        final GridCache<IgniteUuid, IgfsFileInfo> metaCache = ggfs.context().kernalContext().cache().cache(
+            ggfs.configuration().getMetaCacheName());
+
+        // Start a transaction in a separate thread which will lock file ID.
+        final IgniteUuid id = ggfs.context().meta().fileId(path);
+        final IgfsFileInfo info = ggfs.context().meta().info(id);
+
+        final AtomicReference<Throwable> err = new AtomicReference<>();
+
+        try {
+            new Thread(new Runnable() {
+                @Override public void run() {
+                    try {
+
+                        try (IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                            metaCache.get(id);
+
+                            latch.await();
+
+                            U.sleep(1000); // Sleep here so that data manager could "see" oversize.
+
+                            tx.commit();
+                        }
+                    }
+                    catch (Throwable e) {
+                        err.set(e);
+                    }
+                }
+            }).start();
+
+            // Now add file ID to trash listing so that delete worker could "see" it.
+
+            try (IgniteTx tx = metaCache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                Map<String, IgfsListingEntry> listing = Collections.singletonMap(path.name(),
+                    new IgfsListingEntry(info));
+
+                // Clear root listing.
+                metaCache.put(ROOT_ID, new IgfsFileInfo(ROOT_ID));
+
+                // Add file to trash listing.
+                IgfsFileInfo trashInfo = metaCache.get(TRASH_ID);
+
+                if (trashInfo == null)
+                    metaCache.put(TRASH_ID, new IgfsFileInfo(listing, new IgfsFileInfo(TRASH_ID)));
+                else
+                    metaCache.put(TRASH_ID, new IgfsFileInfo(listing, trashInfo));
+
+                tx.commit();
+            }
+
+            assert metaCache.get(TRASH_ID) != null;
+
+            // Now the file is locked and is located in trash, try adding some more data.
+            os = ggfs.create(otherPath, false);
+            os.write(new byte[1]);
+
+            latch.countDown();
+
+            os.close();
+
+            assert err.get() == null;
+        }
+        finally {
+            latch.countDown(); // Safety.
+        }
+    }
+
+    /**
+     * Ensure that GGFS size is correctly updated in case of preloading.
+     *
+     * @throws Exception If failed.
+     */
+    private void checkPreload() throws Exception {
+        assert cacheMode == PARTITIONED;
+
+        startUp();
+
+        // Perform writes.
+        Collection<IgfsFile> files = write();
+
+        // Check sizes.
+        Map<UUID, Integer> expSizes = new HashMap<>(GRID_CNT, 1.0f);
+
+        for (IgfsFile file : files) {
+            for (IgfsBlock block : file.blocks()) {
+                Collection<UUID> ids = primaryOrBackups(block.key());
+
+                for (UUID id : ids) {
+                    if (expSizes.get(id) == null)
+                        expSizes.put(id, block.length());
+                    else
+                        expSizes.put(id, expSizes.get(id) + block.length());
+                }
+            }
+        }
+
+        info("Size map before node start: " + expSizes);
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            UUID id = grid(i).localNode().id();
+
+            GridCacheAdapter<IgfsBlockKey, byte[]> cache = cache(id);
+
+            int expSize = expSizes.get(id) != null ? expSizes.get(id) : 0;
+
+            assertEquals(expSize, cache.ggfsDataSpaceUsed());
+        }
+
+        // Start a node.
+        final CountDownLatch latch = new CountDownLatch(GRID_CNT - 1);
+
+        for (int i = 0; i < GRID_CNT - 1; i++) {
+            grid(0).events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    latch.countDown();
+
+                    return true;
+                }
+            }, EVT_CACHE_PRELOAD_STOPPED);
+        }
+
+        Ignite g = startGrid(GRID_CNT);
+
+        info("Started grid: " + g.cluster().localNode().id());
+
+        U.awaitQuiet(latch);
+
+        // Wait partitions are evicted.
+        awaitPartitionMapExchange();
+
+        // Check sizes again.
+        expSizes.clear();
+
+        for (IgfsFile file : files) {
+            for (IgfsBlock block : file.blocks()) {
+                Collection<UUID> ids = primaryOrBackups(block.key());
+
+                assert !ids.isEmpty();
+
+                for (UUID id : ids) {
+                    if (expSizes.get(id) == null)
+                        expSizes.put(id, block.length());
+                    else
+                        expSizes.put(id, expSizes.get(id) + block.length());
+                }
+            }
+        }
+
+        info("Size map after node start: " + expSizes);
+
+        for (int i = 0; i < GRID_CNT - 1; i++) {
+            UUID id = grid(i).localNode().id();
+
+            GridCacheAdapter<IgfsBlockKey, byte[]> cache = cache(id);
+
+            int expSize = expSizes.get(id) != null ? expSizes.get(id) : 0;
+
+            assertEquals("For node: " + id, expSize, cache.ggfsDataSpaceUsed());
+        }
+    }
+
+    /**
+     * Create data chunk of the given length.
+     *
+     * @param len Length.
+     * @return Data chunk.
+     */
+    private byte[] chunk(int len) {
+        byte[] chunk = new byte[len];
+
+        for (int i = 0; i < len; i++)
+            chunk[i] = (byte)i;
+
+        return chunk;
+    }
+
+    /**
+     * Create block key.
+     *
+     * @param path Path.
+     * @param blockId Block ID.
+     * @return Block key.
+     * @throws Exception If failed.
+     */
+    private IgfsBlockKey blockKey(IgfsPath path, long blockId) throws Exception {
+        IgfsEx ggfs0 = (IgfsEx)grid(0).fileSystem(GGFS_NAME);
+
+        IgniteUuid fileId = ggfs0.context().meta().fileId(path);
+
+        return new IgfsBlockKey(fileId, null, true, blockId);
+    }
+
+    /**
+     * Determine primary node for the given block key.
+     *
+     * @param key Block key.
+     * @return Node ID.
+     */
+    private UUID primary(IgfsBlockKey key) {
+        IgniteEx grid = grid(0);
+
+        for (ClusterNode node : grid.nodes()) {
+            if (grid.cachex(DATA_CACHE_NAME).affinity().isPrimary(node, key))
+                return node.id();
+        }
+
+        return null;
+    }
+
+    /**
+     * Determine primary and backup node IDs for the given block key.
+     *
+     * @param key Block key.
+     * @return Collection of node IDs.
+     */
+    private Collection<UUID> primaryOrBackups(IgfsBlockKey key) {
+        IgniteEx grid = grid(0);
+
+        Collection<UUID> ids = new HashSet<>();
+
+        for (ClusterNode node : grid.nodes()) {
+            if (grid.cachex(DATA_CACHE_NAME).affinity().isPrimaryOrBackup(node, key))
+                ids.add(node.id());
+        }
+
+        return ids;
+    }
+
+    /**
+     * Get GGfs of a node with the given index.
+     *
+     * @param idx Node index.
+     * @return GGFS.
+     * @throws Exception If failed.
+     */
+    private IgfsImpl ggfs(int idx) throws Exception {
+        return (IgfsImpl)grid(idx).fileSystem(GGFS_NAME);
+    }
+
+    /**
+     * Get GGfs of the given node.
+     *
+     * @param ignite Node;
+     * @return GGFS.
+     * @throws Exception If failed.
+     */
+    private IgfsImpl ggfs(Ignite ignite) throws Exception {
+        return (IgfsImpl) ignite.fileSystem(GGFS_NAME);
+    }
+
+    /**
+     * Get data cache for the given node ID.
+     *
+     * @param nodeId Node ID.
+     * @return Data cache.
+     */
+    private GridCacheAdapter<IgfsBlockKey, byte[]> cache(UUID nodeId) {
+        return (GridCacheAdapter<IgfsBlockKey, byte[]>)((IgniteEx)G.ignite(nodeId)).cachex(DATA_CACHE_NAME)
+            .<IgfsBlockKey, byte[]>cache();
+    }
+
+    /**
+     * Perform write of the files.
+     *
+     * @return Collection of written file descriptors.
+     * @throws Exception If failed.
+     */
+    private Collection<IgfsFile> write() throws Exception {
+        Collection<IgfsFile> res = new HashSet<>(FILES_CNT, 1.0f);
+
+        ThreadLocalRandom8 rand = ThreadLocalRandom8.current();
+
+        for (int i = 0; i < FILES_CNT; i++) {
+            // Create empty file locally.
+            IgfsPath path = new IgfsPath("/file-" + i);
+
+            ggfs(0).create(path, false).close();
+
+            IgfsMetaManager meta = ggfs(0).context().meta();
+
+            IgniteUuid fileId = meta.fileId(path);
+
+            // Calculate file blocks.
+            int fileSize = rand.nextInt(MAX_FILE_SIZE);
+
+            int fullBlocks = fileSize / BLOCK_SIZE;
+            int remainderSize = fileSize % BLOCK_SIZE;
+
+            Collection<IgfsBlock> blocks = new ArrayList<>(fullBlocks + remainderSize > 0 ? 1 : 0);
+
+            for (int j = 0; j < fullBlocks; j++)
+                blocks.add(new IgfsBlock(new IgfsBlockKey(fileId, null, true, j), BLOCK_SIZE));
+
+            if (remainderSize > 0)
+                blocks.add(new IgfsBlock(new IgfsBlockKey(fileId, null, true, fullBlocks), remainderSize));
+
+            IgfsFile file = new IgfsFile(path, fileSize, blocks);
+
+            // Actual write.
+            for (IgfsBlock block : blocks) {
+                IgfsOutputStream os = ggfs(0).append(path, false);
+
+                os.write(chunk(block.length()));
+
+                os.close();
+            }
+
+            // Add written file to the result set.
+            res.add(file);
+        }
+
+        return res;
+    }
+
+    /** A file written to the file system. */
+    private static class IgfsFile {
+        /** Path to the file, */
+        private final IgfsPath path;
+
+        /** File length. */
+        private final int len;
+
+        /** Blocks with their corresponding locations. */
+        private final Collection<IgfsBlock> blocks;
+
+        /**
+         * Constructor.
+         *
+         * @param path Path.
+         * @param len Length.
+         * @param blocks Blocks.
+         */
+        private IgfsFile(IgfsPath path, int len, Collection<IgfsBlock> blocks) {
+            this.path = path;
+            this.len = len;
+            this.blocks = blocks;
+        }
+
+        /** @return Path. */
+        IgfsPath path() {
+            return path;
+        }
+
+        /** @return Length. */
+        int length() {
+            return len;
+        }
+
+        /** @return Blocks. */
+        Collection<IgfsBlock> blocks() {
+            return blocks;
+        }
+    }
+
+    /** Block written to the file system. */
+    private static class IgfsBlock {
+        /** Block key. */
+        private final IgfsBlockKey key;
+
+        /** Block length. */
+        private final int len;
+
+        /**
+         * Constructor.
+         *
+         * @param key Block key.
+         * @param len Block length.
+         */
+        private IgfsBlock(IgfsBlockKey key, int len) {
+            this.key = key;
+            this.len = len;
+        }
+
+        /** @return Block key. */
+        private IgfsBlockKey key() {
+            return key;
+        }
+
+        /** @return Block length. */
+        private int length() {
+            return len;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java
new file mode 100644
index 0000000..3810f41
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStreamsSelfTest.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.testframework.GridTestUtils.*;
+
+/**
+ * Tests for GGFS streams content.
+ */
+public class IgfsStreamsSelfTest extends IgfsCommonAbstractTest {
+    /** Test IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Meta-information cache name. */
+    private static final String META_CACHE_NAME = "replicated";
+
+    /** Data cache name. */
+    public static final String DATA_CACHE_NAME = "data";
+
+    /** Group size. */
+    public static final int CFG_GRP_SIZE = 128;
+
+    /** Pre-configured block size. */
+    private static final int CFG_BLOCK_SIZE = 64000;
+
+    /** Number of threads to test parallel readings. */
+    private static final int WRITING_THREADS_CNT = 5;
+
+    /** Number of threads to test parallel readings. */
+    private static final int READING_THREADS_CNT = 5;
+
+    /** Test nodes count. */
+    private static final int NODES_CNT = 4;
+
+    /** Number of retries for async ops. */
+    public static final int ASSERT_RETRIES = 100;
+
+    /** Delay between checks for async ops. */
+    public static final int ASSERT_RETRY_INTERVAL = 100;
+
+    /** File system to test. */
+    private IgniteFs fs;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(NODES_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        if (NODES_CNT <= 0)
+            return;
+
+        // Initialize FS.
+        fs = grid(0).fileSystem("ggfs");
+
+        // Cleanup FS.
+        fs.format();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(cacheConfiguration(META_CACHE_NAME), cacheConfiguration(DATA_CACHE_NAME));
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        IgfsConfiguration ggfsCfg = new IgfsConfiguration();
+
+        ggfsCfg.setMetaCacheName(META_CACHE_NAME);
+        ggfsCfg.setDataCacheName(DATA_CACHE_NAME);
+        ggfsCfg.setName("ggfs");
+        ggfsCfg.setBlockSize(CFG_BLOCK_SIZE);
+        ggfsCfg.setFragmentizerEnabled(true);
+
+        cfg.setGgfsConfiguration(ggfsCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    protected CacheConfiguration cacheConfiguration(String cacheName) {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setName(cacheName);
+
+        if (META_CACHE_NAME.equals(cacheName))
+            cacheCfg.setCacheMode(REPLICATED);
+        else {
+            cacheCfg.setCacheMode(PARTITIONED);
+            cacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY);
+
+            cacheCfg.setBackups(0);
+            cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(CFG_GRP_SIZE));
+        }
+
+        cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setQueryIndexEnabled(false);
+
+        return cacheCfg;
+    }
+
+    /**
+     * Test GGFS construction.
+     *
+     * @throws IgniteCheckedException In case of exception.
+     */
+    public void testConfiguration() throws IgniteCheckedException {
+        GridCache metaCache = getFieldValue(fs, "meta", "metaCache");
+        GridCache dataCache = getFieldValue(fs, "data", "dataCache");
+
+        assertNotNull(metaCache);
+        assertEquals(META_CACHE_NAME, metaCache.name());
+        assertEquals(REPLICATED, metaCache.configuration().getCacheMode());
+
+        assertNotNull(dataCache);
+        assertEquals(DATA_CACHE_NAME, dataCache.name());
+        assertEquals(PARTITIONED, dataCache.configuration().getCacheMode());
+    }
+
+    /**
+     * Test file creation.
+     *
+     * @throws Exception In case of exception.
+     */
+    public void testCreateFile() throws Exception {
+        IgfsPath root = new IgfsPath("/");
+        IgfsPath path = new IgfsPath("/asdf");
+
+        long max = 100L * CFG_BLOCK_SIZE / WRITING_THREADS_CNT;
+
+        for (long size = 0; size <= max; size = size * 15 / 10 + 1) {
+            assertEquals(Collections.<IgfsPath>emptyList(), fs.listPaths(root));
+
+            testCreateFile(path, size, new Random().nextInt());
+        }
+    }
+
+    /** @throws Exception If failed. */
+    public void testCreateFileColocated() throws Exception {
+        IgfsPath path = new IgfsPath("/colocated");
+
+        UUID uuid = UUID.randomUUID();
+
+        IgniteUuid affKey;
+
+        long idx = 0;
+
+        while (true) {
+            affKey = new IgniteUuid(uuid, idx);
+
+            if (grid(0).mapKeyToNode(DATA_CACHE_NAME, affKey).id().equals(grid(0).localNode().id()))
+                break;
+
+            idx++;
+        }
+
+        try (IgfsOutputStream out = fs.create(path, 1024, true, affKey, 0, 1024, null)) {
+            // Write 5M, should be enough to test distribution.
+            for (int i = 0; i < 15; i++)
+                out.write(new byte[1024 * 1024]);
+        }
+
+        IgfsFile info = fs.info(path);
+
+        Collection<IgfsBlockLocation> affNodes = fs.affinity(path, 0, info.length());
+
+        assertEquals(1, affNodes.size());
+        Collection<UUID> nodeIds = F.first(affNodes).nodeIds();
+
+        assertEquals(1, nodeIds.size());
+        assertEquals(grid(0).localNode().id(), F.first(nodeIds));
+    }
+
+    /** @throws Exception If failed. */
+    public void testCreateFileFragmented() throws Exception {
+        IgfsEx impl = (IgfsEx)grid(0).fileSystem("ggfs");
+
+        IgfsFragmentizerManager fragmentizer = impl.context().fragmentizer();
+
+        GridTestUtils.setFieldValue(fragmentizer, "fragmentizerEnabled", false);
+
+        IgfsPath path = new IgfsPath("/file");
+
+        try {
+            IgniteFs fs0 = grid(0).fileSystem("ggfs");
+            IgniteFs fs1 = grid(1).fileSystem("ggfs");
+            IgniteFs fs2 = grid(2).fileSystem("ggfs");
+
+            try (IgfsOutputStream out = fs0.create(path, 128, false, 1, CFG_GRP_SIZE,
+                F.asMap(IgniteFs.PROP_PREFER_LOCAL_WRITES, "true"))) {
+                // 1.5 blocks
+                byte[] data = new byte[CFG_BLOCK_SIZE * 3 / 2];
+
+                Arrays.fill(data, (byte)1);
+
+                out.write(data);
+            }
+
+            try (IgfsOutputStream out = fs1.append(path, false)) {
+                // 1.5 blocks.
+                byte[] data = new byte[CFG_BLOCK_SIZE * 3 / 2];
+
+                Arrays.fill(data, (byte)2);
+
+                out.write(data);
+            }
+
+            // After this we should have first two block colocated with grid 0 and last block colocated with grid 1.
+            IgfsFileImpl fileImpl = (IgfsFileImpl)fs.info(path);
+
+            GridCache<Object, Object> metaCache = grid(0).cachex(META_CACHE_NAME);
+
+            IgfsFileInfo fileInfo = (IgfsFileInfo)metaCache.get(fileImpl.fileId());
+
+            IgfsFileMap map = fileInfo.fileMap();
+
+            List<IgfsFileAffinityRange> ranges = map.ranges();
+
+            assertEquals(2, ranges.size());
+
+            assertTrue(ranges.get(0).startOffset() == 0);
+            assertTrue(ranges.get(0).endOffset() == 2 * CFG_BLOCK_SIZE - 1);
+
+            assertTrue(ranges.get(1).startOffset() == 2 * CFG_BLOCK_SIZE);
+            assertTrue(ranges.get(1).endOffset() == 3 * CFG_BLOCK_SIZE - 1);
+
+            // Validate data read after colocated writes.
+            try (IgfsInputStream in = fs2.open(path)) {
+                // Validate first part of file.
+                for (int i = 0; i < CFG_BLOCK_SIZE * 3 / 2; i++)
+                    assertEquals((byte)1, in.read());
+
+                // Validate second part of file.
+                for (int i = 0; i < CFG_BLOCK_SIZE * 3 / 2; i++)
+                    assertEquals((byte)2, in.read());
+
+                assertEquals(-1, in.read());
+            }
+        }
+        finally {
+            GridTestUtils.setFieldValue(fragmentizer, "fragmentizerEnabled", true);
+
+            boolean hasData = false;
+
+            for (int i = 0; i < NODES_CNT; i++)
+                hasData |= !grid(i).cachex(DATA_CACHE_NAME).isEmpty();
+
+            assertTrue(hasData);
+
+            fs.delete(path, true);
+        }
+
+        GridTestUtils.retryAssert(log, ASSERT_RETRIES, ASSERT_RETRY_INTERVAL, new CAX() {
+            @Override public void applyx() {
+                for (int i = 0; i < NODES_CNT; i++)
+                    assertTrue(grid(i).cachex(DATA_CACHE_NAME).isEmpty());
+            }
+        });
+    }
+
+    /**
+     * Test file creation.
+     *
+     * @param path Path to file to store.
+     * @param size Size of file to store.
+     * @param salt Salt for file content generation.
+     * @throws Exception In case of any exception.
+     */
+    private void testCreateFile(final IgfsPath path, final long size, final int salt) throws Exception {
+        info("Create file [path=" + path + ", size=" + size + ", salt=" + salt + ']');
+
+        final AtomicInteger cnt = new AtomicInteger(0);
+        final Collection<IgfsPath> cleanUp = new ConcurrentLinkedQueue<>();
+
+        long time = runMultiThreaded(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int id = cnt.incrementAndGet();
+
+                IgfsPath f = new IgfsPath(path.parent(), "asdf" + (id > 1 ? "-" + id : ""));
+
+                try (IgfsOutputStream out = fs.create(f, 0, true, null, 0, 1024, null)) {
+                    assertNotNull(out);
+
+                    cleanUp.add(f); // Add all created into cleanup list.
+
+                    U.copy(new IgfsTestInputStream(size, salt), out);
+                }
+
+                return null;
+            }
+        }, WRITING_THREADS_CNT, "perform-multi-thread-writing");
+
+        if (time > 0) {
+            double rate = size * 1000. / time / 1024 / 1024;
+
+            info(String.format("Write file [path=%s, size=%d kB, rate=%2.1f MB/s]", path,
+                WRITING_THREADS_CNT * size / 1024, WRITING_THREADS_CNT * rate));
+        }
+
+        info("Read and validate saved file: " + path);
+
+        final InputStream expIn = new IgfsTestInputStream(size, salt);
+        final IgfsInputStream actIn = fs.open(path, CFG_BLOCK_SIZE * READING_THREADS_CNT * 11 / 10);
+
+        // Validate continuous reading of whole file.
+        assertEqualStreams(expIn, actIn, size, null);
+
+        // Validate random seek and reading.
+        final Random rnd = new Random();
+
+        runMultiThreaded(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                long skip = Math.abs(rnd.nextLong() % (size + 1));
+                long range = Math.min(size - skip, rnd.nextInt(CFG_BLOCK_SIZE * 400));
+
+                assertEqualStreams(new IgfsTestInputStream(size, salt), actIn, range, skip);
+
+                return null;
+            }
+        }, READING_THREADS_CNT, "validate-multi-thread-reading");
+
+        expIn.close();
+        actIn.close();
+
+        info("Get stored file info: " + path);
+
+        IgfsFile desc = fs.info(path);
+
+        info("Validate stored file info: " + desc);
+
+        assertNotNull(desc);
+
+        if (log.isDebugEnabled())
+            log.debug("File descriptor: " + desc);
+
+        Collection<IgfsBlockLocation> aff = fs.affinity(path, 0, desc.length());
+
+        assertFalse("Affinity: " + aff, desc.length() != 0 && aff.isEmpty());
+
+        int blockSize = desc.blockSize();
+
+        assertEquals("File size", size, desc.length());
+        assertEquals("Binary block size", CFG_BLOCK_SIZE, blockSize);
+        //assertEquals("Permission", "rwxr-xr-x", desc.getPermission().toString());
+        //assertEquals("Permission sticky bit marks this is file", false, desc.getPermission().getStickyBit());
+        assertEquals("Type", true, desc.isFile());
+        assertEquals("Type", false, desc.isDirectory());
+
+        info("Cleanup files: " + cleanUp);
+
+        for (IgfsPath f : cleanUp) {
+            fs.delete(f, true);
+            assertNull(fs.info(f));
+        }
+    }
+
+    /**
+     * Validate streams generate the same output.
+     *
+     * @param expIn Expected input stream.
+     * @param actIn Actual input stream.
+     * @param expSize Expected size of the streams.
+     * @param seek Seek to use async position-based reading or {@code null} to use simple continuous reading.
+     * @throws IOException In case of any IO exception.
+     */
+    private void assertEqualStreams(InputStream expIn, IgfsInputStream actIn,
+        @Nullable Long expSize, @Nullable Long seek) throws IOException {
+        if (seek != null)
+            expIn.skip(seek);
+
+        int bufSize = 2345;
+        byte buf1[] = new byte[bufSize];
+        byte buf2[] = new byte[bufSize];
+        long pos = 0;
+
+        long start = System.currentTimeMillis();
+
+        while (true) {
+            int read = (int)Math.min(bufSize, expSize - pos);
+
+            int i1;
+
+            if (seek == null)
+                i1 = actIn.read(buf1, 0, read);
+            else if (seek % 2 == 0)
+                i1 = actIn.read(pos + seek, buf1, 0, read);
+            else {
+                i1 = read;
+
+                actIn.readFully(pos + seek, buf1, 0, read);
+            }
+
+            // Read at least 0 byte, but don't read more then 'i1' or 'read'.
+            int i2 = expIn.read(buf2, 0, Math.max(0, Math.min(i1, read)));
+
+            if (i1 != i2) {
+                fail("Expects the same data [read=" + read + ", pos=" + pos + ", seek=" + seek +
+                    ", i1=" + i1 + ", i2=" + i2 + ']');
+            }
+
+            if (i1 == -1)
+                break; // EOF
+
+            // i1 == bufSize => compare buffers.
+            // i1 <  bufSize => Compare part of buffers, rest of buffers are equal from previous iteration.
+            assertTrue("Expects the same data [read=" + read + ", pos=" + pos + ", seek=" + seek +
+                ", i1=" + i1 + ", i2=" + i2 + ']', Arrays.equals(buf1, buf2));
+
+            if (read == 0)
+                break; // Nothing more to read.
+
+            pos += i1;
+        }
+
+        if (expSize != null)
+            assertEquals(expSize.longValue(), pos);
+
+        long time = System.currentTimeMillis() - start;
+
+        if (time != 0 && log.isInfoEnabled()) {
+            log.info(String.format("Streams were compared in continuous reading " +
+                "[size=%7d, rate=%3.1f MB/sec]", expSize, expSize * 1000. / time / 1024 / 1024));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
new file mode 100644
index 0000000..25400a1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.mapreduce.*;
+import org.apache.ignite.igfs.mapreduce.records.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.igfs.IgfsMode.*;
+
+/**
+ * Tests for {@link org.apache.ignite.igfs.mapreduce.IgfsTask}.
+ */
+public class IgfsTaskSelfTest extends IgfsCommonAbstractTest {
+    /** Predefined words dictionary. */
+    private static final String[] DICTIONARY = new String[] {"word0", "word1", "word2", "word3", "word4", "word5",
+        "word6", "word7"};
+
+    /** File path. */
+    private static final IgfsPath FILE = new IgfsPath("/file");
+
+    /** Shared IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Block size: 64 Kb. */
+    private static final int BLOCK_SIZE = 64 * 1024;
+
+    /** Total words in file. */
+    private static final int TOTAL_WORDS = 2 * 1024 * 1024;
+
+    /** Node count */
+    private static final int NODE_CNT = 4;
+
+    /** Repeat count. */
+    private static final int REPEAT_CNT = 10;
+
+    /** GGFS. */
+    private static IgniteFs ggfs;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        for (int i = 0; i < NODE_CNT; i++) {
+            Ignite g = G.start(config(i));
+
+            if (i + 1 == NODE_CNT)
+                ggfs = g.fileSystem("ggfs");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        ggfs.format();
+    }
+
+    /**
+     * Create grid configuration.
+     *
+     * @param idx Node index.
+     * @return Grid configuration
+     */
+    private IgniteConfiguration config(int idx) {
+        IgfsConfiguration ggfsCfg = new IgfsConfiguration();
+
+        ggfsCfg.setDataCacheName("dataCache");
+        ggfsCfg.setMetaCacheName("metaCache");
+        ggfsCfg.setName("ggfs");
+        ggfsCfg.setBlockSize(BLOCK_SIZE);
+        ggfsCfg.setDefaultMode(PRIMARY);
+        ggfsCfg.setFragmentizerEnabled(false);
+
+        CacheConfiguration dataCacheCfg = new CacheConfiguration();
+
+        dataCacheCfg.setName("dataCache");
+        dataCacheCfg.setCacheMode(PARTITIONED);
+        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+        dataCacheCfg.setDistributionMode(PARTITIONED_ONLY);
+        dataCacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+        dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(1));
+        dataCacheCfg.setBackups(0);
+        dataCacheCfg.setQueryIndexEnabled(false);
+
+        CacheConfiguration metaCacheCfg = new CacheConfiguration();
+
+        metaCacheCfg.setName("metaCache");
+        metaCacheCfg.setCacheMode(REPLICATED);
+        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        metaCacheCfg.setQueryIndexEnabled(false);
+
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+        cfg.setGgfsConfiguration(ggfsCfg);
+
+        cfg.setGridName("node-" + idx);
+
+        return cfg;
+    }
+
+    /**
+     * Test task.
+     *
+     * @throws Exception If failed.
+     */
+    public void testTask() throws Exception {
+        U.sleep(3000); // TODO: Sleep in order to wait for fragmentizing to finish.
+
+        for (int i = 0; i < REPEAT_CNT; i++) {
+            String arg = DICTIONARY[new Random(System.currentTimeMillis()).nextInt(DICTIONARY.length)];
+
+            generateFile(TOTAL_WORDS);
+            Long genLen = ggfs.info(FILE).length();
+
+            IgniteBiTuple<Long, Integer> taskRes = ggfs.execute(new Task(),
+                new IgfsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg);
+
+            assert F.eq(genLen, taskRes.getKey());
+            assert F.eq(TOTAL_WORDS, taskRes.getValue());
+        }
+    }
+
+    /**
+     * Test task.
+     *
+     * @throws Exception If failed.
+     */
+    public void testTaskAsync() throws Exception {
+        U.sleep(3000);
+
+        assertFalse(ggfs.isAsync());
+
+        IgniteFs ggfsAsync = ggfs.withAsync();
+
+        assertTrue(ggfsAsync.isAsync());
+
+        for (int i = 0; i < REPEAT_CNT; i++) {
+            String arg = DICTIONARY[new Random(System.currentTimeMillis()).nextInt(DICTIONARY.length)];
+
+            generateFile(TOTAL_WORDS);
+            Long genLen = ggfs.info(FILE).length();
+
+            assertNull(ggfsAsync.execute(
+                new Task(), new IgfsStringDelimiterRecordResolver(" "), Collections.singleton(FILE), arg));
+
+            IgniteFuture<IgniteBiTuple<Long, Integer>> fut = ggfsAsync.future();
+
+            assertNotNull(fut);
+
+            IgniteBiTuple<Long, Integer> taskRes = fut.get();
+
+            assert F.eq(genLen, taskRes.getKey());
+            assert F.eq(TOTAL_WORDS, taskRes.getValue());
+        }
+
+        ggfsAsync.format();
+
+        IgniteFuture<?> fut = ggfsAsync.future();
+
+        assertNotNull(fut);
+
+        fut.get();
+    }
+
+    /**
+     * Generate file with random data and provided argument.
+     *
+     * @param wordCnt Word count.
+     * @throws Exception If failed.
+     */
+    private void generateFile(int wordCnt)
+        throws Exception {
+        Random rnd = new Random(System.currentTimeMillis());
+
+        try (OutputStreamWriter writer = new OutputStreamWriter(ggfs.create(FILE, true))) {
+            int cnt = 0;
+
+            while (cnt < wordCnt) {
+                String word = DICTIONARY[rnd.nextInt(DICTIONARY.length)];
+
+                writer.write(word + " ");
+
+                cnt++;
+            }
+        }
+    }
+
+    /**
+     * Task.
+     */
+    private static class Task extends IgfsTask<String, IgniteBiTuple<Long, Integer>> {
+        /** {@inheritDoc} */
+        @Override public IgfsJob createJob(IgfsPath path, IgfsFileRange range,
+            IgfsTaskArgs<String> args) {
+            return new Job();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteBiTuple<Long, Integer> reduce(List<ComputeJobResult> ress) {
+            long totalLen = 0;
+            int argCnt = 0;
+
+            for (ComputeJobResult res : ress) {
+                IgniteBiTuple<Long, Integer> res0 = (IgniteBiTuple<Long, Integer>)res.getData();
+
+                if (res0 != null) {
+                    totalLen += res0.getKey();
+                    argCnt += res0.getValue();
+                }
+            }
+
+            return F.t(totalLen, argCnt);
+        }
+    }
+
+    /**
+     * Job.
+     */
+    private static class Job implements IgfsJob, Serializable {
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        @TaskSessionResource
+        private ComputeTaskSession ses;
+
+        @JobContextResource
+        private ComputeJobContext ctx;
+
+        /** {@inheritDoc} */
+        @Override public Object execute(IgniteFs ggfs, IgfsFileRange range, IgfsInputStream in)
+            throws IOException {
+            assert ignite != null;
+            assert ses != null;
+            assert ctx != null;
+
+            in.seek(range.start());
+
+            byte[] buf = new byte[(int)range.length()];
+
+            int totalRead = 0;
+
+            while (totalRead < buf.length) {
+                int b = in.read();
+
+                assert b != -1;
+
+                buf[totalRead++] = (byte)b;
+            }
+
+            String str = new String(buf);
+
+            String[] chunks = str.split(" ");
+
+            int ctr = 0;
+
+            for (String chunk : chunks) {
+                if (!chunk.isEmpty())
+                    ctr++;
+            }
+
+            return F.t(range.length(), ctr);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTestInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTestInputStream.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTestInputStream.java
new file mode 100644
index 0000000..14d5fcf
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTestInputStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import java.io.*;
+
+/**
+ * Test input stream with predictable data output and zero memory usage.
+ */
+class IgfsTestInputStream extends InputStream {
+    /** This stream length. */
+    private long size;
+
+    /** Salt for input data generation. */
+    private long salt;
+
+    /** Current stream position. */
+    private long pos;
+
+    /**
+     * Constructs test input stream.
+     *
+     * @param size This stream length.
+     * @param salt Salt for input data generation.
+     */
+    IgfsTestInputStream(long size, long salt) {
+        this.size = size;
+        this.salt = salt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read() throws IOException {
+        if (pos >= size)
+            return -1;
+
+        long next = salt ^ (salt * pos++);
+
+        next ^= next >>> 32;
+        next ^= next >>> 16;
+        next ^= next >>> 8;
+
+        return (int)(0xFF & next);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized long skip(long n) throws IOException {
+        pos += Math.min(n, size - pos);
+
+        return size - pos;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/package.html b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/package.html
new file mode 100644
index 0000000..6556981
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/package.html
@@ -0,0 +1,24 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<body>
+    <!-- Package description. -->
+    Contains internal tests or test related classes and interfaces.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/split/IgfsAbstractRecordResolverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/split/IgfsAbstractRecordResolverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/split/IgfsAbstractRecordResolverSelfTest.java
new file mode 100644
index 0000000..edf2464
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/split/IgfsAbstractRecordResolverSelfTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.split;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.mapreduce.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.igfs.IgfsMode.*;
+
+/**
+ * Base class for all split resolvers
+ */
+public class IgfsAbstractRecordResolverSelfTest extends GridCommonAbstractTest {
+    /** File path. */
+    protected static final IgfsPath FILE = new IgfsPath("/file");
+
+    /** Shared IP finder. */
+    private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** GGFS. */
+    protected static IgniteFs ggfs;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        IgfsConfiguration ggfsCfg = new IgfsConfiguration();
+
+        ggfsCfg.setDataCacheName("dataCache");
+        ggfsCfg.setMetaCacheName("metaCache");
+        ggfsCfg.setName("ggfs");
+        ggfsCfg.setBlockSize(512);
+        ggfsCfg.setDefaultMode(PRIMARY);
+
+        CacheConfiguration dataCacheCfg = new CacheConfiguration();
+
+        dataCacheCfg.setName("dataCache");
+        dataCacheCfg.setCacheMode(PARTITIONED);
+        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+        dataCacheCfg.setDistributionMode(NEAR_PARTITIONED);
+        dataCacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+        dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128));
+        dataCacheCfg.setBackups(0);
+        dataCacheCfg.setQueryIndexEnabled(false);
+
+        CacheConfiguration metaCacheCfg = new CacheConfiguration();
+
+        metaCacheCfg.setName("metaCache");
+        metaCacheCfg.setCacheMode(REPLICATED);
+        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+        metaCacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+        metaCacheCfg.setQueryIndexEnabled(false);
+
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setGridName("grid");
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(discoSpi);
+        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+        cfg.setGgfsConfiguration(ggfsCfg);
+
+        Ignite g = G.start(cfg);
+
+        ggfs = g.fileSystem("ggfs");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        ggfs.format();
+    }
+
+    /**
+     * Convenient method for wrapping some bytes into byte array.
+     *
+     * @param data Data bytes.
+     * @return Byte array.
+     */
+    protected static byte[] wrap(int... data) {
+        byte[] res = new byte[data.length];
+
+        for (int i = 0; i < data.length; i++)
+            res[i] = (byte)data[i];
+
+        return res;
+    }
+
+    /**
+     * Create byte array consisting of the given chunks.
+     *
+     * @param chunks Array of chunks where the first value is the byte array and the second value is amount of repeats.
+     * @return Byte array.
+     */
+    protected static byte[] array(Map.Entry<byte[], Integer>... chunks) {
+        int totalSize = 0;
+
+        for (Map.Entry<byte[], Integer> chunk : chunks)
+            totalSize += chunk.getKey().length * chunk.getValue();
+
+        byte[] res = new byte[totalSize];
+
+        int pos = 0;
+
+        for (Map.Entry<byte[], Integer> chunk : chunks) {
+            for (int i = 0; i < chunk.getValue(); i++) {
+                System.arraycopy(chunk.getKey(), 0, res, pos, chunk.getKey().length);
+
+                pos += chunk.getKey().length;
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * Open file for read and return input stream.
+     *
+     * @return Input stream.
+     * @throws Exception In case of exception.
+     */
+    protected IgfsInputStream read() throws Exception {
+        return ggfs.open(FILE);
+    }
+
+    /**
+     * Write data to the file.
+     *
+     * @param chunks Data chunks.
+     * @throws Exception In case of exception.
+     */
+    protected void write(byte[]... chunks) throws Exception {
+        IgfsOutputStream os =  ggfs.create(FILE, true);
+
+        if (chunks != null) {
+            for (byte[] chunk : chunks)
+                os.write(chunk);
+        }
+
+        os.close();
+    }
+
+    /**
+     * Create split.
+     *
+     * @param start Start position.
+     * @param len Length.
+     * @return Split.
+     */
+    protected IgfsFileRange split(long start, long len) {
+        return new IgfsFileRange(FILE, start, len);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/split/IgfsByteDelimiterRecordResolverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/split/IgfsByteDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/split/IgfsByteDelimiterRecordResolverSelfTest.java
new file mode 100644
index 0000000..5fb19dc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/split/IgfsByteDelimiterRecordResolverSelfTest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.split;
+
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.mapreduce.*;
+import org.apache.ignite.igfs.mapreduce.records.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+/**
+ * Byte delimiter split resolver self test.
+ */
+public class IgfsByteDelimiterRecordResolverSelfTest extends IgfsAbstractRecordResolverSelfTest {
+    /**
+     * Test split resolution when there are no delimiters in the file.
+     *
+     * @throws Exception If failed.
+     */
+    public void testNoDelimiters() throws Exception {
+        byte[] delim = wrap(2);
+        byte[] data = array(F.t(wrap(1), 8));
+
+        assertSplit(0, 4, 0, 8, data, delim);
+        assertSplit(0, 8, 0, 8, data, delim);
+
+        assertSplitNull(2, 2, data, delim);
+        assertSplitNull(2, 6, data, delim);
+    }
+
+    /**
+     * Test split resolution when there is one delimiter at the head.
+     *
+     * @throws Exception If failed.
+     */
+    public void testHeadDelimiter() throws Exception {
+        byte[] delim = array(F.t(wrap(2), 8));
+        byte[] data = array(F.t(delim, 1), F.t(wrap(1), 8));
+
+        assertSplit(0, 4, 0, 8, data, delim);
+        assertSplit(0, 8, 0, 8, data, delim);
+        assertSplit(0, 12, 0, 16, data, delim);
+        assertSplit(0, 16, 0, 16, data, delim);
+
+        assertSplitNull(2, 2, data, delim);
+        assertSplitNull(2, 6, data, delim);
+        assertSplit(2, 10, 8, 8, data, delim);
+        assertSplit(2, 14, 8, 8, data, delim);
+
+        assertSplit(8, 4, 8, 8, data, delim);
+        assertSplit(8, 8, 8, 8, data, delim);
+
+        assertSplitNull(10, 2, data, delim);
+        assertSplitNull(10, 6, data, delim);
+    }
+
+    /**
+     * Test split when there is one delimiter at the end.
+     *
+     * @throws Exception If failed.
+     */
+    public void testEndDelimiter() throws Exception {
+        byte[] delim = array(F.t(wrap(2), 8));
+        byte[] data = array(F.t(wrap(1), 8), F.t(delim, 1));
+
+        assertSplit(0, 4, 0, 16, data, delim);
+        assertSplit(0, 8, 0, 16, data, delim);
+        assertSplit(0, 12, 0, 16, data, delim);
+        assertSplit(0, 16, 0, 16, data, delim);
+
+        assertSplitNull(2, 2, data, delim);
+        assertSplitNull(2, 6, data, delim);
+        assertSplitNull(2, 10, data, delim);
+        assertSplitNull(2, 14, data, delim);
+
+        assertSplitNull(8, 4, data, delim);
+        assertSplitNull(8, 8, data, delim);
+
+        assertSplitNull(10, 2, data, delim);
+        assertSplitNull(10, 6, data, delim);
+    }
+
+    /**
+     * Test split when there is one delimiter in the middle.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMiddleDelimiter() throws Exception {
+        byte[] delim = array(F.t(wrap(2), 8));
+        byte[] data = array(F.t(wrap(1), 8), F.t(delim, 1), F.t(wrap(1), 8));
+
+        assertSplit(0, 4, 0, 16, data, delim);
+        assertSplit(0, 8, 0, 16, data, delim);
+        assertSplit(0, 12, 0, 16, data, delim);
+        assertSplit(0, 16, 0, 16, data, delim);
+        assertSplit(0, 20, 0, 24, data, delim);
+        assertSplit(0, 24, 0, 24, data, delim);
+
+        assertSplitNull(2, 2, data, delim);
+        assertSplitNull(2, 6, data, delim);
+        assertSplitNull(2, 10, data, delim);
+        assertSplitNull(2, 14, data, delim);
+        assertSplit(2, 18, 16, 8, data, delim);
+        assertSplit(2, 22, 16, 8, data, delim);
+
+        assertSplitNull(8, 4, data, delim);
+        assertSplitNull(8, 8, data, delim);
+        assertSplit(8, 12, 16, 8, data, delim);
+        assertSplit(8, 16, 16, 8, data, delim);
+
+        assertSplitNull(10, 2, data, delim);
+        assertSplitNull(10, 6, data, delim);
+        assertSplit(10, 10, 16, 8, data, delim);
+        assertSplit(10, 14, 16, 8, data, delim);
+
+        assertSplit(16, 4, 16, 8, data, delim);
+        assertSplit(16, 8, 16, 8, data, delim);
+
+        assertSplitNull(18, 2, data, delim);
+        assertSplitNull(18, 6, data, delim);
+    }
+
+    /**
+     * Test split when there are two head delimiters.
+     *
+     * @throws Exception If failed.
+     */
+    public void testTwoHeadDelimiters() throws Exception {
+        byte[] delim = array(F.t(wrap(2), 8));
+        byte[] data = array(F.t(delim, 2), F.t(wrap(1), 8));
+
+        assertSplit(0, 4, 0, 8, data, delim);
+        assertSplit(0, 8, 0, 8, data, delim);
+        assertSplit(0, 12, 0, 16, data, delim);
+        assertSplit(0, 16, 0, 16, data, delim);
+        assertSplit(0, 20, 0, 24, data, delim);
+        assertSplit(0, 24, 0, 24, data, delim);
+
+        assertSplitNull(2, 2, data, delim);
+        assertSplitNull(2, 6, data, delim);
+        assertSplit(2, 10, 8, 8, data, delim);
+        assertSplit(2, 14, 8, 8, data, delim);
+        assertSplit(2, 18, 8, 16, data, delim);
+        assertSplit(2, 22, 8, 16, data, delim);
+
+        assertSplit(8, 4, 8, 8, data, delim);
+        assertSplit(8, 8, 8, 8, data, delim);
+        assertSplit(8, 12, 8, 16, data, delim);
+        assertSplit(8, 16, 8, 16, data, delim);
+
+        assertSplitNull(10, 2, data, delim);
+        assertSplitNull(10, 6, data, delim);
+        assertSplit(10, 10, 16, 8, data, delim);
+        assertSplit(10, 14, 16, 8, data, delim);
+
+        assertSplit(16, 4, 16, 8, data, delim);
+        assertSplit(16, 8, 16, 8, data, delim);
+
+        assertSplitNull(18, 2, data, delim);
+        assertSplitNull(18, 6, data, delim);
+    }
+
+    /**
+     * Test split when there are two tail delimiters.
+     *
+     * @throws Exception If failed.
+     */
+    public void testTwoTailDelimiters() throws Exception {
+        byte[] delim = array(F.t(wrap(2), 8));
+        byte[] data = array(F.t(wrap(1), 8), F.t(delim, 2));
+
+        assertSplit(0, 4, 0, 16, data, delim);
+        assertSplit(0, 8, 0, 16, data, delim);
+        assertSplit(0, 12, 0, 16, data, delim);
+        assertSplit(0, 16, 0, 16, data, delim);
+        assertSplit(0, 20, 0, 24, data, delim);
+        assertSplit(0, 24, 0, 24, data, delim);
+
+        assertSplitNull(2, 2, data, delim);
+        assertSplitNull(2, 6, data, delim);
+        assertSplitNull(2, 10, data, delim);
+        assertSplitNull(2, 14, data, delim);
+        assertSplit(2, 18, 16, 8, data, delim);
+        assertSplit(2, 22, 16, 8, data, delim);
+
+        assertSplitNull(8, 4, data, delim);
+        assertSplitNull(8, 8, data, delim);
+        assertSplit(8, 12, 16, 8, data, delim);
+        assertSplit(8, 16, 16, 8, data, delim);
+
+        assertSplitNull(10, 2, data, delim);
+        assertSplitNull(10, 6, data, delim);
+        assertSplit(10, 10, 16, 8, data, delim);
+        assertSplit(10, 14, 16, 8, data, delim);
+
+        assertSplit(16, 4, 16, 8, data, delim);
+        assertSplit(16, 8, 16, 8, data, delim);
+
+        assertSplitNull(18, 2, data, delim);
+        assertSplitNull(18, 6, data, delim);
+    }
+
+    /**
+     * Test split when there is one head delimiter, one tail delimiter and some data between them.
+     *
+     * @throws Exception If failed.
+     */
+    public void testHeadAndTailDelimiters() throws Exception {
+        byte[] delim = array(F.t(wrap(2), 8));
+        byte[] data = array(F.t(delim, 1), F.t(wrap(1), 8), F.t(delim, 1));
+
+        assertSplit(0, 4, 0, 8, data, delim);
+        assertSplit(0, 8, 0, 8, data, delim);
+        assertSplit(0, 12, 0, 24, data, delim);
+        assertSplit(0, 16, 0, 24, data, delim);
+        assertSplit(0, 20, 0, 24, data, delim);
+        assertSplit(0, 24, 0, 24, data, delim);
+
+        assertSplitNull(2, 2, data, delim);
+        assertSplitNull(2, 6, data, delim);
+        assertSplit(2, 10, 8, 16, data, delim);
+        assertSplit(2, 14, 8, 16, data, delim);
+        assertSplit(2, 18, 8, 16, data, delim);
+        assertSplit(2, 22, 8, 16, data, delim);
+
+        assertSplit(8, 4, 8, 16, data, delim);
+        assertSplit(8, 8, 8, 16, data, delim);
+        assertSplit(8, 12, 8, 16, data, delim);
+        assertSplit(8, 16, 8, 16, data, delim);
+
+        assertSplitNull(10, 2, data, delim);
+        assertSplitNull(10, 6, data, delim);
+        assertSplitNull(10, 10, data, delim);
+        assertSplitNull(10, 14, data, delim);
+
+        assertSplitNull(16, 4, data, delim);
+        assertSplitNull(16, 8, data, delim);
+
+        assertSplitNull(18, 2, data, delim);
+        assertSplitNull(18, 6, data, delim);
+    }
+
+    /**
+     * Test special case when delimiter starts with the same bytes as the last previos data byte.
+     *
+     * @throws Exception If failed.
+     */
+    public void testDelimiterStartsWithTheSameBytesAsLastPreviousDataByte() throws Exception {
+        byte[] delim = array(F.t(wrap(1, 1, 2), 1));
+        byte[] data = array(F.t(wrap(1), 1), F.t(delim, 1), F.t(wrap(1), 1));
+
+        assertSplit(0, 1, 0, 4, data, delim);
+        assertSplit(0, 2, 0, 4, data, delim);
+        assertSplit(0, 4, 0, 4, data, delim);
+        assertSplit(0, 5, 0, 5, data, delim);
+
+        assertSplit(1, 4, 4, 1, data, delim);
+    }
+
+    /**
+     * Check split resolution.
+     *
+     * @param suggestedStart Suggested start.
+     * @param suggestedLen Suggested length.
+     * @param expStart Expected start.
+     * @param expLen Expected length.
+     * @param data File data.
+     * @param delims Delimiters.
+     * @throws Exception If failed.
+     */
+    public void assertSplit(long suggestedStart, long suggestedLen, long expStart, long expLen, byte[] data,
+        byte[]... delims) throws Exception {
+        write(data);
+
+        IgfsByteDelimiterRecordResolver rslvr = resolver(delims);
+
+        IgfsFileRange split;
+
+        try (IgfsInputStream is = read()) {
+            split = rslvr.resolveRecords(ggfs, is, split(suggestedStart, suggestedLen));
+        }
+
+        assert split != null : "Split is null.";
+        assert split.start() == expStart : "Incorrect start [expected=" + expStart + ", actual=" + split.start() + ']';
+        assert split.length() == expLen : "Incorrect length [expected=" + expLen + ", actual=" + split.length() + ']';
+    }
+
+    /**
+     * Check the split resolution resulted in {@code null}.
+     *
+     * @param suggestedStart Suggested start.
+     * @param suggestedLen Suggested length.
+     * @param data File data.
+     * @param delims Delimiters.
+     * @throws Exception If failed.
+     */
+    public void assertSplitNull(long suggestedStart, long suggestedLen, byte[] data, byte[]... delims)
+        throws Exception {
+        write(data);
+
+        IgfsByteDelimiterRecordResolver rslvr = resolver(delims);
+
+        IgfsFileRange split;
+
+        try (IgfsInputStream is = read()) {
+            split = rslvr.resolveRecords(ggfs, is, split(suggestedStart, suggestedLen));
+        }
+
+        assert split == null : "Split is not null.";
+    }
+
+    /**
+     * Create resolver.
+     *
+     * @param delims Delimiters.
+     * @return Resolver.
+     */
+    private IgfsByteDelimiterRecordResolver resolver(byte[]... delims) {
+        return new IgfsByteDelimiterRecordResolver(delims);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/split/IgfsFixedLengthRecordResolverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/split/IgfsFixedLengthRecordResolverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/split/IgfsFixedLengthRecordResolverSelfTest.java
new file mode 100644
index 0000000..03ac32c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/split/IgfsFixedLengthRecordResolverSelfTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.split;
+
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.mapreduce.*;
+import org.apache.ignite.igfs.mapreduce.records.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+/**
+ * Fixed length split resolver self test.
+ */
+public class IgfsFixedLengthRecordResolverSelfTest extends IgfsAbstractRecordResolverSelfTest {
+    /**
+     * Test split resolver.
+     *
+     * @throws Exception If failed.
+     */
+    public void testResolver() throws Exception {
+        byte[] data = array(F.t(wrap(1), 24));
+
+        assertSplit(0, 4, 0, 8, data, 8);
+        assertSplit(0, 8, 0, 8, data, 8);
+        assertSplit(0, 12, 0, 16, data, 8);
+        assertSplit(0, 16, 0, 16, data, 8);
+        assertSplit(0, 20, 0, 24, data, 8);
+        assertSplit(0, 24, 0, 24, data, 8);
+        assertSplit(0, 28, 0, 24, data, 8);
+        assertSplit(0, 32, 0, 24, data, 8);
+
+        assertSplitNull(2, 2, data, 8);
+        assertSplitNull(2, 6, data, 8);
+        assertSplit(2, 10, 8, 8, data, 8);
+        assertSplit(2, 14, 8, 8, data, 8);
+        assertSplit(2, 18, 8, 16, data, 8);
+        assertSplit(2, 22, 8, 16, data, 8);
+        assertSplit(2, 26, 8, 16, data, 8);
+        assertSplit(2, 30, 8, 16, data, 8);
+
+        assertSplit(8, 4, 8, 8, data, 8);
+        assertSplit(8, 8, 8, 8, data, 8);
+        assertSplit(8, 12, 8, 16, data, 8);
+        assertSplit(8, 16, 8, 16, data, 8);
+        assertSplit(8, 20, 8, 16, data, 8);
+        assertSplit(8, 24, 8, 16, data, 8);
+
+        assertSplitNull(10, 2, data, 8);
+        assertSplitNull(10, 6, data, 8);
+        assertSplit(10, 10, 16, 8, data, 8);
+        assertSplit(10, 14, 16, 8, data, 8);
+        assertSplit(10, 18, 16, 8, data, 8);
+        assertSplit(10, 22, 16, 8, data, 8);
+
+        assertSplit(16, 4, 16, 8, data, 8);
+        assertSplit(16, 8, 16, 8, data, 8);
+        assertSplit(16, 12, 16, 8, data, 8);
+        assertSplit(16, 16, 16, 8, data, 8);
+
+        assertSplitNull(18, 2, data, 8);
+        assertSplitNull(18, 6, data, 8);
+        assertSplitNull(18, 10, data, 8);
+        assertSplitNull(18, 14, data, 8);
+
+        assertSplitNull(24, 4, data, 8);
+        assertSplitNull(24, 8, data, 8);
+
+        assertSplitNull(26, 2, data, 8);
+        assertSplitNull(26, 6, data, 8);
+    }
+
+    /**
+     * Check split resolution.
+     *
+     * @param suggestedStart Suggested start.
+     * @param suggestedLen Suggested length.
+     * @param expStart Expected start.
+     * @param expLen Expected length.
+     * @param data File data.
+     * @param len Length.
+     * @throws Exception If failed.
+     */
+    public void assertSplit(long suggestedStart, long suggestedLen, long expStart, long expLen, byte[] data, int len)
+        throws Exception {
+        write(data);
+
+        IgfsFixedLengthRecordResolver rslvr = resolver(len);
+
+        IgfsFileRange split;
+
+        try (IgfsInputStream is = read()) {
+            split = rslvr.resolveRecords(ggfs, is, split(suggestedStart, suggestedLen));
+        }
+
+        assert split != null : "Split is null.";
+        assert split.start() == expStart : "Incorrect start [expected=" + expStart + ", actual=" + split.start() + ']';
+        assert split.length() == expLen : "Incorrect length [expected=" + expLen + ", actual=" + split.length() + ']';
+    }
+
+    /**
+     * Check the split resolution resulted in {@code null}.
+     *
+     * @param suggestedStart Suggested start.
+     * @param suggestedLen Suggested length.
+     * @param data File data.
+     * @param len Length.
+     * @throws Exception If failed.
+     */
+    public void assertSplitNull(long suggestedStart, long suggestedLen, byte[] data, int len)
+        throws Exception {
+        write(data);
+
+        IgfsFixedLengthRecordResolver rslvr = resolver(len);
+
+        IgfsFileRange split;
+
+        try (IgfsInputStream is = read()) {
+            split = rslvr.resolveRecords(ggfs, is, split(suggestedStart, suggestedLen));
+        }
+
+        assert split == null : "Split is not null.";
+    }
+
+    /**
+     * Create resolver.
+     *
+     * @param len Length.
+     * @return Resolver.
+     */
+    private IgfsFixedLengthRecordResolver resolver(int len) {
+        return new IgfsFixedLengthRecordResolver(len);
+    }
+}


Mime
View raw message