ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [18/57] [abbrv] incubator-ignite git commit: # IGNITE-226: WIP (5)
Date Fri, 13 Feb 2015 10:54:28 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/132f900c/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/IgfsLoad.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/IgfsLoad.java b/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/IgfsLoad.java
new file mode 100644
index 0000000..24e05cc
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/IgfsLoad.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Hadoop file system load application.
+ * <p>
+ * Command line arguments:
+ * <ul>
+ * <li>-u {url} file system URL</li>
+ * <li>-hadoopCfg {cfg} Hadoop configuration</li>
+ * <li>-f {num} files number</li>
+ * <li>-r {num} reads number</li>
+ * <li>-w {num} writes number</li>
+ * <li>-d {num} deletes number</li>
+ * <li>-delay {delay} delay between operations in milliseconds</li>
+ * <li>-t {num} threads number</li>
+ * <li>-minSize {min size} min file size in bytes</li>
+ * <li>-maxSize {max size} max file size in bytes</li>
+ * <li>-startNode {true|false} if 'true' then starts node before execution</li>
+ * <li>-nodeCfg {cfg} configuration for started node</li>
+ * <li>-primaryOnly {true|false} if 'true' then creates files only in directory named 'primary' </li>
+ * </ul>
+ * Note: GGFS logging is disabled by default, to enable logging it is necessary to set flag
+ * 'fs.ggfs.<name>.log.enabled' in Hadoop configuration file. By default log files are created in the
+ * directory 'work/ggfs/log', this path can be changed in Hadoop configuration file using property
+ * 'fs.ggfs.<name>.log.dir'.
+ */
+public class IgfsLoad {
+    /** */
+    private static final String DFLT_URL = "ggfs:///";
+
+    /** */
+    private static final int DFLT_MIN_FILE_SIZE = 100 * 1024;
+
+    /** */
+    private static final int DFLT_MAX_FILE_SIZE = 1024 * 1024;
+
+    /** */
+    private static final int DFLT_FILES_NUMBER = 1000;
+
+    /** */
+    private static final int DFLT_READS_NUMBER = 2000;
+
+    /** */
+    private static final int DFLT_WRITES_NUMBER = 2000;
+
+    /** */
+    private static final int DFLT_DELETES_NUMBER = 100;
+
+    /** */
+    private static final int DFLT_THREADS_NUMBER = 2;
+
+    /** */
+    private static final boolean DFLT_START_NODE = true;
+
+    /** */
+    private static final boolean DFLT_PRIMARY_ONLY = false;
+
+    /** */
+    private static final String DFLT_NODE_CFG = "config/hadoop/default-config.xml";
+
+    /** */
+    private static final long DFLT_DELAY = 5;
+
+    /** */
+    private static final String DFLT_HADOOP_CFG = "examples/config/filesystem/core-site.xml";
+
+    /** */
+    private static final int CREATE_BUF_SIZE = 100 * 1024;
+
+    /** */
+    private static final String DIR_PRIMARY_MODE = "primary";
+
+    /** */
+    private static final String DIR_PROXY_MODE = "proxy";
+
+    /** */
+    private static final String DIR_DUAL_SYNC_MODE = "dual_sync";
+
+    /** */
+    private static final String DIR_DUAL_ASYNC_MODE = "dual_async";
+
+    /**
+     * Main method.
+     *
+     * @param args Command line arguments.
+     * @throws Exception If error occurs.
+     */
+    public static void main(String[] args) throws Exception {
+        String url = DFLT_URL;
+
+        int filesNum = DFLT_FILES_NUMBER;
+
+        int minFileSize = DFLT_MIN_FILE_SIZE;
+
+        int maxFileSize = DFLT_MAX_FILE_SIZE;
+
+        int readsNum = DFLT_READS_NUMBER;
+
+        int writesNum = DFLT_WRITES_NUMBER;
+
+        int deletesNum = DFLT_DELETES_NUMBER;
+
+        int threadsNum = DFLT_THREADS_NUMBER;
+
+        long delay = DFLT_DELAY;
+
+        String nodeCfg = DFLT_NODE_CFG;
+
+        String hadoopCfg = DFLT_HADOOP_CFG;
+
+        boolean startNode = DFLT_START_NODE;
+
+        boolean primaryOnly = DFLT_PRIMARY_ONLY;
+
+        for (int i = 0; i < args.length; i++) {
+            String arg = args[i];
+
+            switch (arg) {
+                case "-u":
+                    url = args[++i];
+
+                    break;
+
+                case "-hadoopCfg":
+                    hadoopCfg= args[++i];
+
+                    break;
+
+                case "-f":
+                    filesNum = Integer.parseInt(args[++i]);
+
+                    break;
+
+                case "-r":
+                    readsNum = Integer.parseInt(args[++i]);
+
+                    break;
+
+                case "-w":
+                    writesNum = Integer.parseInt(args[++i]);
+
+                    break;
+
+                case "-minSize":
+                    minFileSize = Integer.parseInt(args[++i]);
+
+                    break;
+
+                case "-maxSize":
+                    maxFileSize = Integer.parseInt(args[++i]);
+
+                    break;
+
+                case "-d":
+                    deletesNum = Integer.parseInt(args[++i]);
+
+                    break;
+
+                case "-t":
+                    threadsNum = Integer.parseInt(args[++i]);
+
+                    break;
+
+                case "-delay":
+                    delay = Long.parseLong(args[++i]);
+
+                    break;
+
+                case "-startNode":
+                    startNode = Boolean.parseBoolean(args[++i]);
+
+                    break;
+
+                case "-nodeCfg":
+                    nodeCfg= args[++i];
+
+                    break;
+
+                case "-primaryOnly":
+                    primaryOnly = Boolean.parseBoolean(args[++i]);
+
+                    break;
+            }
+        }
+
+        X.println("File system URL: " + url);
+        X.println("Hadoop configuration: " + hadoopCfg);
+        X.println("Primary mode only: " + primaryOnly);
+        X.println("Files number: " + filesNum);
+        X.println("Reads number: " + readsNum);
+        X.println("Writes number: " + writesNum);
+        X.println("Deletes number: " + deletesNum);
+        X.println("Min file size: " + minFileSize);
+        X.println("Max file size: " + maxFileSize);
+        X.println("Threads number: " + threadsNum);
+        X.println("Delay: " + delay);
+
+        if (minFileSize > maxFileSize)
+            throw new IllegalArgumentException();
+
+        Ignite ignite = null;
+
+        if (startNode) {
+            X.println("Starting node using configuration: " + nodeCfg);
+
+            ignite = G.start(U.resolveIgniteUrl(nodeCfg));
+        }
+
+        try {
+            new IgfsLoad().runLoad(url, hadoopCfg, primaryOnly, threadsNum, filesNum, readsNum, writesNum,
+                deletesNum, minFileSize, maxFileSize, delay);
+        }
+        finally {
+            if (ignite != null)
+                G.stop(true);
+        }
+    }
+
+    /**
+     * Executes read/write/delete operations.
+     *
+     * @param url File system url.
+     * @param hadoopCfg Hadoop configuration.
+     * @param primaryOnly If {@code true} then creates files only on directory named 'primary'.
+     * @param threads Threads number.
+     * @param files Files number.
+     * @param reads Reads number.
+     * @param writes Writes number.
+     * @param deletes Deletes number.
+     * @param minSize Min file size.
+     * @param maxSize Max file size.
+     * @param delay Delay between operations.
+     * @throws Exception If some file system operation failed.
+     */
+    @SuppressWarnings("IfMayBeConditional")
+    public void runLoad(String url, String hadoopCfg, final boolean primaryOnly, int threads, int files,
+        final int reads, final int writes, final int deletes, final int minSize, final int maxSize, final long delay)
+        throws Exception {
+        Path fsPath = new Path(url);
+
+        Configuration cfg = new Configuration(true);
+
+        cfg.addResource(U.resolveIgniteUrl(hadoopCfg));
+
+        final FileSystem fs = FileSystem.get(fsPath.toUri(), cfg);
+
+        Path workDir = new Path(fsPath, "/fsload");
+
+        fs.delete(workDir, true);
+
+        fs.mkdirs(workDir, FsPermission.getDefault());
+
+        final Path[] dirs;
+
+        if (primaryOnly)
+            dirs = new Path[]{mkdir(fs, workDir, DIR_PRIMARY_MODE)};
+        else
+            dirs = new Path[]{mkdir(fs, workDir, DIR_PRIMARY_MODE), mkdir(fs, workDir, DIR_PROXY_MODE),
+                mkdir(fs, workDir, DIR_DUAL_SYNC_MODE), mkdir(fs, workDir, DIR_DUAL_ASYNC_MODE)};
+
+        try {
+            ExecutorService exec = Executors.newFixedThreadPool(threads);
+
+            Collection<Future<?>> futs = new ArrayList<>(threads);
+
+            for (int i = 0; i < threads; i++) {
+                final int filesPerThread;
+
+                if (i == 0 && files % threads != 0)
+                    filesPerThread = files / threads + files % threads;
+                else
+                    filesPerThread = files / threads;
+
+                futs.add(exec.submit(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        runLoad(fs, dirs, filesPerThread, reads, writes, deletes, minSize, maxSize, delay);
+
+                        return null;
+                    }
+                }));
+            }
+
+            exec.shutdown();
+
+            for (Future<?> fut : futs) {
+                try {
+                    fut.get();
+                }
+                catch (ExecutionException e) {
+                    X.error("Error during execution: " + e);
+
+                    e.getCause().printStackTrace();
+                }
+            }
+        }
+        finally {
+            try {
+                fs.delete(workDir, true);
+            }
+            catch (IOException ignored) {
+                // Ignore.
+            }
+        }
+    }
+
+    /**
+     * Executes read/write/delete operations.
+     *
+     * @param fs File system.
+     * @param dirs Directories where files should be created.
+     * @param filesNum Files number.
+     * @param reads Reads number.
+     * @param writes Writes number.
+     * @param deletes Deletes number.
+     * @param minSize Min file size.
+     * @param maxSize Max file size.
+     * @param delay Delay between operations.
+     * @throws Exception If some file system operation failed.
+     */
+    private void runLoad(FileSystem fs, Path[] dirs, int filesNum, int reads, int writes, int deletes,
+        int minSize, int maxSize, long delay) throws Exception {
+        Random random = random();
+
+        List<T2<Path, Integer>> files = new ArrayList<>(filesNum);
+
+        for (int i = 0; i < filesNum; i++) {
+            int size = maxSize == minSize ? minSize : minSize + random.nextInt(maxSize - minSize);
+
+            Path file = new Path(dirs[i % dirs.length], "file-" + UUID.randomUUID());
+
+            createFile(fs, file, size, CREATE_BUF_SIZE);
+
+            files.add(new T2<>(file, size));
+        }
+
+        List<Path> toDel = new ArrayList<>(deletes);
+
+        for (int i = 0; i < deletes; i++) {
+            int size = maxSize == minSize ? minSize : minSize + random.nextInt(maxSize - minSize);
+
+            Path file = new Path(dirs[i % dirs.length], "file-to-delete-" + UUID.randomUUID());
+
+            createFile(fs, file, size, CREATE_BUF_SIZE);
+
+            toDel.add(file);
+        }
+
+        while (reads > 0 || writes > 0 || deletes > 0) {
+            if (reads > 0) {
+                reads--;
+
+                T2<Path, Integer> file = files.get(reads % files.size());
+
+                readFull(fs, file.get1(), CREATE_BUF_SIZE);
+
+                int fileSize = file.get2();
+
+                readRandom(fs, file.get1(), fileSize, random.nextInt(fileSize) + 1);
+            }
+
+            if (writes > 0) {
+                writes--;
+
+                T2<Path, Integer> file = files.get(writes % files.size());
+
+                overwriteFile(fs, file.get1(), file.get2(), CREATE_BUF_SIZE);
+
+                appendToFile(fs, file.get1(), random.nextInt(CREATE_BUF_SIZE) + 1);
+            }
+
+            if (deletes > 0) {
+                deletes--;
+
+                deleteFile(fs, toDel.get(deletes));
+            }
+
+            U.sleep(delay);
+        }
+    }
+
+    /**
+     * Creates file.
+     *
+     * @param fs File system.
+     * @param file File path.
+     * @param fileSize File size.
+     * @param bufSize Write buffer size.
+     * @throws IOException If operation failed.
+     */
+    private static void createFile(FileSystem fs, Path file, int fileSize, int bufSize) throws IOException {
+        create(fs, file, fileSize, bufSize, false);
+    }
+
+    /**
+     * Overwrites file.
+     *
+     * @param fs File system.
+     * @param file File path.
+     * @param fileSize File size.
+     * @param bufSize Write buffer size.
+     * @throws IOException If operation failed.
+     */
+    private static void overwriteFile(FileSystem fs, Path file, int fileSize, int bufSize) throws IOException {
+        create(fs, file, fileSize, bufSize, true);
+    }
+
+    /**
+     * Appends to file.
+     *
+     * @param fs File system.
+     * @param file File path.
+     * @param appendSize Append size.
+     * @throws IOException If operation failed.
+     */
+    private static void appendToFile(FileSystem fs, Path file, int appendSize) throws IOException {
+        try (FSDataOutputStream out = fs.append(file)) {
+            out.write(new byte[appendSize]);
+        }
+    }
+
+    /**
+     * Reads whole file.
+     *
+     * @param fs File system.
+     * @param file File path.
+     * @param bufSize Read buffer size.
+     * @throws IOException If operation failed.
+     */
+    @SuppressWarnings("StatementWithEmptyBody")
+    private static void readFull(FileSystem fs, Path file, int bufSize) throws IOException {
+        try (FSDataInputStream in = fs.open(file)) {
+            byte[] readBuf = new byte[bufSize];
+
+            while (in.read(readBuf) > 0) {
+                // No-op.
+            }
+        }
+    }
+
+    /**
+     * Deletes file.
+     *
+     * @param fs File system.
+     * @param path File path.
+     * @throws IOException If operation failed.
+     */
+    private static void deleteFile(FileSystem fs, Path path) throws IOException {
+        fs.delete(path, false);
+    }
+
+    /**
+     * Reads from random position.
+     *
+     * @param fs File system.
+     * @param path File path.
+     * @param fileSize File size.
+     * @param readSize Read size.
+     * @throws IOException If operation failed.
+     */
+    private static void readRandom(FileSystem fs, Path path, int fileSize, int readSize) throws IOException {
+        byte[] readBuf = new byte[readSize];
+
+        try (FSDataInputStream in = fs.open(path)) {
+            in.seek(random().nextInt(fileSize));
+
+            in.read(readBuf);
+        }
+    }
+
+    /**
+     * Creates file.
+     *
+     * @param fs File system.
+     * @param file File path.
+     * @param fileSize File size.
+     * @param bufSize Buffer size.
+     * @param overwrite Overwrite flag.
+     * @throws IOException If operation failed.
+     */
+    private static void create(FileSystem fs, Path file, int fileSize, int bufSize, boolean overwrite)
+        throws IOException {
+        try (FSDataOutputStream out = fs.create(file, overwrite)) {
+            int size = 0;
+
+            byte[] buf = new byte[bufSize];
+
+            while (size < fileSize) {
+                int len = Math.min(fileSize - size, bufSize);
+
+                out.write(buf, 0, len);
+
+                size += len;
+            }
+        }
+    }
+
+    /**
+     * Creates directory in the given parent directory.
+     *
+     * @param fs File system.
+     * @param parentDir Parent directory.
+     * @param dirName Directory name.
+     * @return Path for created directory.
+     * @throws IOException If operation failed.
+     */
+    private static Path mkdir(FileSystem fs, Path parentDir, String dirName) throws IOException {
+        Path path = new Path(parentDir, dirName);
+
+        fs.mkdirs(path, FsPermission.getDefault());
+
+        return path;
+    }
+
+    /**
+     * @return Thread local random.
+     */
+    private static Random random() {
+        return ThreadLocalRandom.current();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/132f900c/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/IgfsNearOnlyMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/IgfsNearOnlyMultiNodeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/IgfsNearOnlyMultiNodeSelfTest.java
new file mode 100644
index 0000000..db5458e
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/IgfsNearOnlyMultiNodeSelfTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.ipc.shmem.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Test hadoop file system implementation.
+ */
+public class IgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest {
+    /** Path to the default hadoop configuration. */
+    public static final String HADOOP_FS_CFG = "examples/config/filesystem/core-site.xml";
+
+    /** Group size. */
+    public static final int GRP_SIZE = 128;
+
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Node count. */
+    private int cnt;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(nodeCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        G.stopAll(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration();
+
+        ggfsCfg.setDataCacheName("partitioned");
+        ggfsCfg.setMetaCacheName("partitioned");
+        ggfsCfg.setName("ggfs");
+
+        ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{
+            put("type", "shmem");
+            put("port", String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt));
+        }});
+
+        ggfsCfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups.
+
+        cfg.setGgfsConfiguration(ggfsCfg);
+
+        cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+        cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
+
+        cnt++;
+
+        return cfg;
+    }
+
+    /** @return Node count for test. */
+    protected int nodeCount() {
+        return 4;
+    }
+
+    /**
+     * Gets cache configuration.
+     *
+     * @param gridName Grid name.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration cacheConfiguration(String gridName) {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setName("partitioned");
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setDistributionMode(cnt == 0 ? NEAR_ONLY : PARTITIONED_ONLY);
+        cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cacheCfg.setAffinityMapper(new IgniteFsGroupDataBlocksKeyMapper(GRP_SIZE));
+        cacheCfg.setBackups(0);
+        cacheCfg.setQueryIndexEnabled(false);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        return cacheCfg;
+    }
+
+    /**
+     * Gets config of concrete File System.
+     *
+     * @return Config of concrete File System.
+     */
+    protected Configuration getFileSystemConfig() {
+        Configuration cfg = new Configuration();
+
+        cfg.addResource(U.resolveIgniteUrl(HADOOP_FS_CFG));
+
+        return cfg;
+    }
+
+    /**
+     * Gets File System name.
+     *
+     * @param grid Grid index.
+     * @return File System name.
+     */
+    protected URI getFileSystemURI(int grid) {
+        try {
+            return new URI("ggfs://127.0.0.1:" + (IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + grid));
+        }
+        catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** @throws Exception If failed. */
+    public void testContentsConsistency() throws Exception {
+        try (FileSystem fs = FileSystem.get(getFileSystemURI(0), getFileSystemConfig())) {
+            Collection<IgniteBiTuple<String, Long>> files = F.asList(
+                F.t("/dir1/dir2/file1", 1024L),
+                F.t("/dir1/dir2/file2", 8 * 1024L),
+                F.t("/dir1/file1", 1024 * 1024L),
+                F.t("/dir1/file2", 5 * 1024 * 1024L),
+                F.t("/file1", 64 * 1024L + 13),
+                F.t("/file2", 13L),
+                F.t("/file3", 123764L)
+            );
+
+            for (IgniteBiTuple<String, Long> file : files) {
+
+                info("Writing file: " + file.get1());
+
+                try (OutputStream os = fs.create(new Path(file.get1()), (short)3)) {
+                    byte[] data = new byte[file.get2().intValue()];
+
+                    data[0] = 25;
+                    data[data.length - 1] = 26;
+
+                    os.write(data);
+                }
+
+                info("Finished writing file: " + file.get1());
+            }
+
+            for (int i = 1; i < nodeCount(); i++) {
+
+                try (FileSystem ignored = FileSystem.get(getFileSystemURI(i), getFileSystemConfig())) {
+                    for (IgniteBiTuple<String, Long> file : files) {
+                        Path path = new Path(file.get1());
+
+                        FileStatus fileStatus = fs.getFileStatus(path);
+
+                        assertEquals(file.get2(), (Long)fileStatus.getLen());
+
+                        byte[] read = new byte[file.get2().intValue()];
+
+                        info("Reading file: " + path);
+
+                        try (FSDataInputStream in = fs.open(path)) {
+                            in.readFully(read);
+
+                            assert read[0] == 25;
+                            assert read[read.length - 1] == 26;
+                        }
+
+                        info("Finished reading file: " + path);
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/132f900c/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/IgniteFsEventsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/IgniteFsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/IgniteFsEventsTestSuite.java
deleted file mode 100644
index 2d07e9d..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/IgniteFsEventsTestSuite.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ignitefs;
-
-import junit.framework.*;
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.fs.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.ipc.shmem.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-import static org.apache.ignite.ignitefs.IgniteFsMode.*;
-
-/**
- * Test suite for GGFS event tests.
- */
-@SuppressWarnings("PublicInnerClass")
-public class IgniteFsEventsTestSuite extends TestSuite {
-    /**
-     * @return Test suite.
-     * @throws Exception Thrown in case of the failure.
-     */
-    public static TestSuite suite() throws Exception {
-        GridHadoopClassLoader ldr = new GridHadoopClassLoader(null);
-
-        TestSuite suite = new TestSuite("Ignite FS Events Test Suite");
-
-        suite.addTest(new TestSuite(ldr.loadClass(ShmemPrivate.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(ShmemDualSync.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(ShmemDualAsync.class.getName())));
-
-        suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrivate.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName())));
-
-        return suite;
-    }
-
-    /**
-     * @return Test suite with only tests that are supported on all platforms.
-     * @throws Exception Thrown in case of the failure.
-     */
-    public static TestSuite suiteNoarchOnly() throws Exception {
-        GridHadoopClassLoader ldr = new GridHadoopClassLoader(null);
-
-        TestSuite suite = new TestSuite("Ignite GGFS Events Test Suite Noarch Only");
-
-        suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrivate.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName())));
-
-        return suite;
-    }
-
-    /**
-     * Shared memory IPC in PRIVATE mode.
-     */
-    public static class ShmemPrivate extends IgfsEventsAbstractSelfTest {
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{
-                put("type", "shmem");
-                put("port", String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1));
-            }});
-
-            return ggfsCfg;
-        }
-    }
-
-    /**
-     * Loopback socket IPS in PRIVATE mode.
-     */
-    public static class LoopbackPrivate extends IgfsEventsAbstractSelfTest {
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{
-                put("type", "tcp");
-                put("port", String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1));
-            }});
-
-            return ggfsCfg;
-        }
-    }
-
-    /**
-     * Base class for all GGFS tests with primary and secondary file system.
-     */
-    public abstract static class PrimarySecondaryTest extends IgfsEventsAbstractSelfTest {
-        /** Secondary file system. */
-        private static IgniteFs ggfsSec;
-
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setSecondaryFileSystem(new GridGgfsHadoopFileSystemWrapper(
-                "ggfs://ggfs-secondary:grid-secondary@127.0.0.1:11500/",
-                "modules/core/src/test/config/hadoop/core-site-secondary.xml"));
-
-            return ggfsCfg;
-        }
-
-        /**
-         * @return GGFS configuration for secondary file system.
-         */
-        protected IgniteFsConfiguration getSecondaryGgfsConfiguration() throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setName("ggfs-secondary");
-            ggfsCfg.setDefaultMode(PRIMARY);
-            ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>(){{
-                put("type", "tcp");
-                put("port", "11500");
-            }});
-
-            return ggfsCfg;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void beforeTestsStarted() throws Exception {
-            ggfsSec = startSecondary();
-
-            super.beforeTestsStarted();
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void afterTestsStopped() throws Exception {
-            super.afterTestsStopped();
-
-            G.stopAll(true);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void afterTest() throws Exception {
-            super.afterTest();
-
-            // Clean up secondary file system.
-            ggfsSec.format();
-        }
-
-        /**
-         * Start a grid with the secondary file system.
-         *
-         * @return Secondary file system handle.
-         * @throws Exception If failed.
-         */
-        @Nullable private IgniteFs startSecondary() throws Exception {
-            IgniteConfiguration cfg = getConfiguration("grid-secondary", getSecondaryGgfsConfiguration());
-
-            cfg.setLocalHost("127.0.0.1");
-            cfg.setPeerClassLoadingEnabled(false);
-
-            Ignite secG = G.start(cfg);
-
-            return secG.fileSystem("ggfs-secondary");
-        }
-    }
-
-    /**
-     * Shared memory IPC in DUAL_SYNC mode.
-     */
-    public static class ShmemDualSync extends PrimarySecondaryTest {
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setDefaultMode(DUAL_SYNC);
-
-            return ggfsCfg;
-        }
-    }
-
-    /**
-     * Shared memory IPC in DUAL_SYNC mode.
-     */
-    public static class ShmemDualAsync extends PrimarySecondaryTest {
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setDefaultMode(DUAL_ASYNC);
-
-            return ggfsCfg;
-        }
-    }
-
-    /**
-     * Loopback socket IPC with secondary file system.
-     */
-    public abstract static class LoopbackPrimarySecondaryTest extends PrimarySecondaryTest {
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setSecondaryFileSystem(new GridGgfsHadoopFileSystemWrapper(
-                "ggfs://ggfs-secondary:grid-secondary@127.0.0.1:11500/",
-                "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"));
-
-            return ggfsCfg;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getSecondaryGgfsConfiguration() throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getSecondaryGgfsConfiguration();
-
-            ggfsCfg.setName("ggfs-secondary");
-            ggfsCfg.setDefaultMode(PRIMARY);
-            ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{
-                put("type", "tcp");
-                put("port", "11500");
-            }});
-
-            return ggfsCfg;
-        }
-    }
-
-    /**
-     * Loopback IPC in DUAL_SYNC mode.
-     */
-    public static class LoopbackDualSync extends LoopbackPrimarySecondaryTest {
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setDefaultMode(DUAL_SYNC);
-
-            return ggfsCfg;
-        }
-    }
-
-    /**
-     * Loopback socket IPC in DUAL_ASYNC mode.
-     */
-    public static class LoopbackDualAsync extends LoopbackPrimarySecondaryTest {
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setDefaultMode(DUAL_ASYNC);
-
-            return ggfsCfg;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/132f900c/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/GridGgfsNodeStartup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/GridGgfsNodeStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/GridGgfsNodeStartup.java
deleted file mode 100644
index 07714cf..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/GridGgfsNodeStartup.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.ggfs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.*;
-
-import javax.swing.*;
-
-/**
- * Node startup for GGFS performance benchmark.
- */
-public class GridGgfsNodeStartup {
-    /**
-     * Start up an empty node with specified cache configuration.
-     *
-     * @param args Command line arguments, none required.
-     * @throws IgniteCheckedException If example execution failed.
-     */
-    public static void main(String[] args) throws IgniteCheckedException {
-        try (Ignite ignored = G.start("config/hadoop/default-config.xml")) {
-            // Wait until Ok is pressed.
-            JOptionPane.showMessageDialog(
-                null,
-                new JComponent[] {
-                    new JLabel("Ignite started."),
-                    new JLabel("Press OK to stop Ignite.")
-                },
-                "Ignite",
-                JOptionPane.INFORMATION_MESSAGE
-            );
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/132f900c/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/GridGgfsPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/GridGgfsPerformanceBenchmark.java b/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/GridGgfsPerformanceBenchmark.java
deleted file mode 100644
index 73cde0e..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/GridGgfsPerformanceBenchmark.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.loadtests.ggfs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.testframework.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Tests write throughput.
- */
-public class GridGgfsPerformanceBenchmark {
-    /** Path to test hadoop configuration. */
-    private static final String HADOOP_FS_CFG = "modules/core/src/test/config/hadoop/core-site.xml";
-
-    /** FS prefix. */
-    private static final String FS_PREFIX = "ggfs:///";
-
-    /** Test writes. */
-    private static final int OP_WRITE = 0;
-
-    /** Test reads. */
-    private static final int OP_READ = 1;
-
-    /**
-     * Starts benchmark.
-     *
-     * @param args Program arguments.
-     *      [0] - number of threads, default 1.
-     *      [1] - file length, default is 1GB.
-     *      [2] - stream buffer size, default is 1M.
-     *      [3] - fs config path.
-     * @throws Exception If failed.
-     */
-    public static void main(String[] args) throws Exception {
-        final int threadNum = intArgument(args, 0, 1);
-        final int op = intArgument(args, 1, OP_WRITE);
-        final long fileLen = longArgument(args, 2, 256 * 1024 * 1024);
-        final int bufSize = intArgument(args, 3, 128 * 1024);
-        final String cfgPath = argument(args, 4, HADOOP_FS_CFG);
-        final String fsPrefix = argument(args, 5, FS_PREFIX);
-        final short replication = (short)intArgument(args, 6, 3);
-
-        final Path ggfsHome = new Path(fsPrefix);
-
-        final FileSystem fs = ggfs(ggfsHome, cfgPath);
-
-        final AtomicLong progress = new AtomicLong();
-
-        final AtomicInteger idx = new AtomicInteger();
-
-        System.out.println("Warming up...");
-
-//        warmUp(fs, ggfsHome, op, fileLen);
-
-        System.out.println("Finished warm up.");
-
-        if (op == OP_READ) {
-            for (int i = 0; i < threadNum; i++)
-                benchmarkWrite(fs, new Path(ggfsHome, "in-" + i), fileLen, bufSize, replication, null);
-        }
-
-        long total = 0;
-
-        long start = System.currentTimeMillis();
-
-        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
-            @Override public void run() {
-                String fileIdx = op == OP_READ ? String.valueOf(idx.getAndIncrement()) : UUID.randomUUID().toString();
-
-                try {
-                    for (int i = 0; i < 200; i++) {
-                        if (op == OP_WRITE)
-                            benchmarkWrite(fs, new Path(ggfsHome, "out-" + fileIdx), fileLen, bufSize, replication,
-                                progress);
-                        else
-                            benchmarkRead(fs, new Path(ggfsHome, "in-" + fileIdx), bufSize, progress);
-                    }
-
-                    System.out.println("Finished " + (op == OP_WRITE ? "writing" : "reading") + " data.");
-                }
-                catch (Exception e) {
-                    System.out.println("Failed to process stream: " + e);
-
-                    e.printStackTrace();
-                }
-            }
-        }, threadNum, "test-runner");
-
-        while (!fut.isDone()) {
-            U.sleep(1000);
-
-            long written = progress.getAndSet(0);
-
-            total += written;
-
-            int mbytesPerSec = (int)(written / (1024 * 1024));
-
-            System.out.println((op == OP_WRITE ? "Write" : "Read") + " rate [threads=" + threadNum +
-                ", bufSize=" + bufSize + ", MBytes/s=" + mbytesPerSec + ']');
-        }
-
-        long now = System.currentTimeMillis();
-
-        System.out.println((op == OP_WRITE ? "Written" : "Read") + " " + total + " bytes in " + (now - start) +
-            "ms, avg write rate is " + (total * 1000 / ((now - start) * 1024 * 1024)) + "MBytes/s");
-
-        fs.close();
-    }
-
-    /**
-     * Warms up server side.
-     *
-     * @param fs File system.
-     * @param ggfsHome GGFS home.
-     * @throws Exception If failed.
-     */
-    private static void warmUp(FileSystem fs, Path ggfsHome, int op, long fileLen) throws Exception {
-        Path file = new Path(ggfsHome, "out-0");
-
-        benchmarkWrite(fs, file, fileLen, 1024 * 1024, (short)1, null);
-
-        for (int i = 0; i < 5; i++) {
-            if (op == OP_WRITE)
-                benchmarkWrite(fs, file, fileLen, 1024 * 1024, (short)1, null);
-            else
-                benchmarkRead(fs, file, 1024 * 1024, null);
-        }
-
-        fs.delete(file, true);
-    }
-
-    /**
-     * @param args Arguments.
-     * @param idx Index.
-     * @param dflt Default value.
-     * @return Argument value.
-     */
-    private static int intArgument(String[] args, int idx, int dflt) {
-        if (args.length <= idx)
-            return dflt;
-
-        try {
-            return Integer.parseInt(args[idx]);
-        }
-        catch (NumberFormatException ignored) {
-            return dflt;
-        }
-    }
-
-    /**
-     * @param args Arguments.
-     * @param idx Index.
-     * @param dflt Default value.
-     * @return Argument value.
-     */
-    private static long longArgument(String[] args, int idx, long dflt) {
-        if (args.length <= idx)
-            return dflt;
-
-        try {
-            return Long.parseLong(args[idx]);
-        }
-        catch (NumberFormatException ignored) {
-            return dflt;
-        }
-    }
-
-    /**
-     * @param args Arguments.
-     * @param idx Index.
-     * @param dflt Default value.
-     * @return Argument value.
-     */
-    private static String argument(String[] args, int idx, String dflt) {
-        if (args.length <= idx)
-            return dflt;
-
-        return args[idx];
-    }
-
-    /** {@inheritDoc} */
-    private static FileSystem ggfs(Path home, String cfgPath) throws IOException {
-        Configuration cfg = new Configuration();
-
-        cfg.addResource(U.resolveIgniteUrl(cfgPath));
-
-        return FileSystem.get(home.toUri(), cfg);
-    }
-
-    /**
-     * Tests stream write to specified file.
-     *
-     * @param file File to write to.
-     * @param len Length to write.
-     * @param bufSize Buffer size.
-     * @param replication Replication factor.
-     * @param progress Progress that will be incremented on each written chunk.
-     */
-    private static void benchmarkWrite(FileSystem fs, Path file, long len, int bufSize, short replication,
-        @Nullable AtomicLong progress) throws Exception {
-
-        try (FSDataOutputStream out = fs.create(file, true, bufSize, replication, fs.getDefaultBlockSize())) {
-            long written = 0;
-
-            byte[] data = new byte[bufSize];
-
-            while (written < len) {
-                int chunk = (int) Math.min(len - written, bufSize);
-
-                out.write(data, 0, chunk);
-
-                written += chunk;
-
-                if (progress != null)
-                    progress.addAndGet(chunk);
-            }
-
-            out.flush();
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        }
-    }
-
-    /**
-     * Tests stream read from specified file.
-     *
-     * @param file File to write from
-     * @param bufSize Buffer size.
-     * @param progress Progress that will be incremented on each written chunk.
-     */
-    private static void benchmarkRead(FileSystem fs, Path file, int bufSize, @Nullable AtomicLong progress)
-        throws Exception {
-
-        try (FSDataInputStream in = fs.open(file, bufSize)) {
-            byte[] data = new byte[32 * bufSize];
-
-            while (true) {
-                int read = in.read(data);
-
-                if (read < 0)
-                    return;
-
-                if (progress != null)
-                    progress.addAndGet(read);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/132f900c/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsNodeStartup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsNodeStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsNodeStartup.java
new file mode 100644
index 0000000..5e8d06b
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsNodeStartup.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.loadtests.ggfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import javax.swing.*;
+
+/**
+ * Node startup for GGFS performance benchmark.
+ */
+public class IgfsNodeStartup {
+    /**
+     * Start up an empty node with specified cache configuration.
+     *
+     * @param args Command line arguments, none required.
+     * @throws IgniteCheckedException If example execution failed.
+     */
+    public static void main(String[] args) throws IgniteCheckedException {
+        try (Ignite ignored = G.start("config/hadoop/default-config.xml")) {
+            // Wait until Ok is pressed.
+            JOptionPane.showMessageDialog(
+                null,
+                new JComponent[] {
+                    new JLabel("Ignite started."),
+                    new JLabel("Press OK to stop Ignite.")
+                },
+                "Ignite",
+                JOptionPane.INFORMATION_MESSAGE
+            );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/132f900c/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsPerformanceBenchmark.java b/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsPerformanceBenchmark.java
new file mode 100644
index 0000000..0fa6991
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsPerformanceBenchmark.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.loadtests.ggfs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Tests write throughput.
+ */
+public class IgfsPerformanceBenchmark {
+    /** Path to test hadoop configuration. */
+    private static final String HADOOP_FS_CFG = "modules/core/src/test/config/hadoop/core-site.xml";
+
+    /** FS prefix. */
+    private static final String FS_PREFIX = "ggfs:///";
+
+    /** Test writes. */
+    private static final int OP_WRITE = 0;
+
+    /** Test reads. */
+    private static final int OP_READ = 1;
+
+    /**
+     * Starts benchmark.
+     *
+     * @param args Program arguments.
+     *      [0] - number of threads, default 1.
+     *      [1] - file length, default is 1GB.
+     *      [2] - stream buffer size, default is 1M.
+     *      [3] - fs config path.
+     * @throws Exception If failed.
+     */
+    public static void main(String[] args) throws Exception {
+        final int threadNum = intArgument(args, 0, 1);
+        final int op = intArgument(args, 1, OP_WRITE);
+        final long fileLen = longArgument(args, 2, 256 * 1024 * 1024);
+        final int bufSize = intArgument(args, 3, 128 * 1024);
+        final String cfgPath = argument(args, 4, HADOOP_FS_CFG);
+        final String fsPrefix = argument(args, 5, FS_PREFIX);
+        final short replication = (short)intArgument(args, 6, 3);
+
+        final Path ggfsHome = new Path(fsPrefix);
+
+        final FileSystem fs = ggfs(ggfsHome, cfgPath);
+
+        final AtomicLong progress = new AtomicLong();
+
+        final AtomicInteger idx = new AtomicInteger();
+
+        System.out.println("Warming up...");
+
+//        warmUp(fs, ggfsHome, op, fileLen);
+
+        System.out.println("Finished warm up.");
+
+        if (op == OP_READ) {
+            for (int i = 0; i < threadNum; i++)
+                benchmarkWrite(fs, new Path(ggfsHome, "in-" + i), fileLen, bufSize, replication, null);
+        }
+
+        long total = 0;
+
+        long start = System.currentTimeMillis();
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                String fileIdx = op == OP_READ ? String.valueOf(idx.getAndIncrement()) : UUID.randomUUID().toString();
+
+                try {
+                    for (int i = 0; i < 200; i++) {
+                        if (op == OP_WRITE)
+                            benchmarkWrite(fs, new Path(ggfsHome, "out-" + fileIdx), fileLen, bufSize, replication,
+                                progress);
+                        else
+                            benchmarkRead(fs, new Path(ggfsHome, "in-" + fileIdx), bufSize, progress);
+                    }
+
+                    System.out.println("Finished " + (op == OP_WRITE ? "writing" : "reading") + " data.");
+                }
+                catch (Exception e) {
+                    System.out.println("Failed to process stream: " + e);
+
+                    e.printStackTrace();
+                }
+            }
+        }, threadNum, "test-runner");
+
+        while (!fut.isDone()) {
+            U.sleep(1000);
+
+            long written = progress.getAndSet(0);
+
+            total += written;
+
+            int mbytesPerSec = (int)(written / (1024 * 1024));
+
+            System.out.println((op == OP_WRITE ? "Write" : "Read") + " rate [threads=" + threadNum +
+                ", bufSize=" + bufSize + ", MBytes/s=" + mbytesPerSec + ']');
+        }
+
+        long now = System.currentTimeMillis();
+
+        System.out.println((op == OP_WRITE ? "Written" : "Read") + " " + total + " bytes in " + (now - start) +
+            "ms, avg write rate is " + (total * 1000 / ((now - start) * 1024 * 1024)) + "MBytes/s");
+
+        fs.close();
+    }
+
+    /**
+     * Warms up server side.
+     *
+     * @param fs File system.
+     * @param ggfsHome GGFS home.
+     * @throws Exception If failed.
+     */
+    private static void warmUp(FileSystem fs, Path ggfsHome, int op, long fileLen) throws Exception {
+        Path file = new Path(ggfsHome, "out-0");
+
+        benchmarkWrite(fs, file, fileLen, 1024 * 1024, (short)1, null);
+
+        for (int i = 0; i < 5; i++) {
+            if (op == OP_WRITE)
+                benchmarkWrite(fs, file, fileLen, 1024 * 1024, (short)1, null);
+            else
+                benchmarkRead(fs, file, 1024 * 1024, null);
+        }
+
+        fs.delete(file, true);
+    }
+
+    /**
+     * @param args Arguments.
+     * @param idx Index.
+     * @param dflt Default value.
+     * @return Argument value.
+     */
+    private static int intArgument(String[] args, int idx, int dflt) {
+        if (args.length <= idx)
+            return dflt;
+
+        try {
+            return Integer.parseInt(args[idx]);
+        }
+        catch (NumberFormatException ignored) {
+            return dflt;
+        }
+    }
+
+    /**
+     * @param args Arguments.
+     * @param idx Index.
+     * @param dflt Default value.
+     * @return Argument value.
+     */
+    private static long longArgument(String[] args, int idx, long dflt) {
+        if (args.length <= idx)
+            return dflt;
+
+        try {
+            return Long.parseLong(args[idx]);
+        }
+        catch (NumberFormatException ignored) {
+            return dflt;
+        }
+    }
+
+    /**
+     * @param args Arguments.
+     * @param idx Index.
+     * @param dflt Default value.
+     * @return Argument value.
+     */
+    private static String argument(String[] args, int idx, String dflt) {
+        if (args.length <= idx)
+            return dflt;
+
+        return args[idx];
+    }
+
+    /** {@inheritDoc} */
+    private static FileSystem ggfs(Path home, String cfgPath) throws IOException {
+        Configuration cfg = new Configuration();
+
+        cfg.addResource(U.resolveIgniteUrl(cfgPath));
+
+        return FileSystem.get(home.toUri(), cfg);
+    }
+
+    /**
+     * Tests stream write to specified file.
+     *
+     * @param file File to write to.
+     * @param len Length to write.
+     * @param bufSize Buffer size.
+     * @param replication Replication factor.
+     * @param progress Progress that will be incremented on each written chunk.
+     */
+    private static void benchmarkWrite(FileSystem fs, Path file, long len, int bufSize, short replication,
+        @Nullable AtomicLong progress) throws Exception {
+
+        try (FSDataOutputStream out = fs.create(file, true, bufSize, replication, fs.getDefaultBlockSize())) {
+            long written = 0;
+
+            byte[] data = new byte[bufSize];
+
+            while (written < len) {
+                int chunk = (int) Math.min(len - written, bufSize);
+
+                out.write(data, 0, chunk);
+
+                written += chunk;
+
+                if (progress != null)
+                    progress.addAndGet(chunk);
+            }
+
+            out.flush();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    /**
+     * Tests stream read from specified file.
+     *
+     * @param file File to write from
+     * @param bufSize Buffer size.
+     * @param progress Progress that will be incremented on each written chunk.
+     */
+    private static void benchmarkRead(FileSystem fs, Path file, int bufSize, @Nullable AtomicLong progress)
+        throws Exception {
+
+        try (FSDataInputStream in = fs.open(file, bufSize)) {
+            byte[] data = new byte[32 * bufSize];
+
+            while (true) {
+                int read = in.read(data);
+
+                if (read < 0)
+                    return;
+
+                if (progress != null)
+                    progress.addAndGet(read);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/132f900c/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteFsLinuxAndMacOSTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteFsLinuxAndMacOSTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteFsLinuxAndMacOSTestSuite.java
index 58b685f..eed2da2 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteFsLinuxAndMacOSTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteFsLinuxAndMacOSTestSuite.java
@@ -42,21 +42,21 @@ public class IgniteFsLinuxAndMacOSTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(ldr.loadClass(IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest.class.getName())));
 
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemShmemExternalPrimarySelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemShmemExternalSecondarySelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemShmemExternalDualSyncSelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemShmemExternalDualAsyncSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemExternalPrimarySelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemExternalSecondarySelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemExternalDualSyncSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemExternalDualAsyncSelfTest.class.getName())));
 
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemShmemEmbeddedPrimarySelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemShmemEmbeddedSecondarySelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemShmemEmbeddedDualSyncSelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemEmbeddedPrimarySelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemEmbeddedSecondarySelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemEmbeddedDualSyncSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.class.getName())));
 
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemIpcCacheSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemIpcCacheSelfTest.class.getName())));
 
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoop20FileSystemShmemPrimarySelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoop20FileSystemShmemPrimarySelfTest.class.getName())));
 
-        suite.addTest(IgniteFsEventsTestSuite.suite());
+        suite.addTest(IgfsEventsTestSuite.suite());
 
         return suite;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/132f900c/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 240281e..578df0a 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -53,30 +53,30 @@ public class IgniteHadoopTestSuite extends TestSuite {
 
         TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite");
 
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemLoopbackExternalPrimarySelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemLoopbackExternalSecondarySelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemLoopbackExternalDualSyncSelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemLoopbackExternalDualAsyncSelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemLoopbackEmbeddedPrimarySelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemLoopbackEmbeddedSecondarySelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemLoopbackEmbeddedDualSyncSelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackExternalPrimarySelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackExternalSecondarySelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackExternalDualSyncSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackExternalDualAsyncSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackEmbeddedPrimarySelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackEmbeddedSecondarySelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackEmbeddedDualSyncSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest.class.getName())));
 
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemSecondaryModeSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemSecondaryModeSelfTest.class.getName())));
 
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemClientSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemClientSelfTest.class.getName())));
 
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemLoggerStateSelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemLoggerSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoggerStateSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoggerSelfTest.class.getName())));
 
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopFileSystemHandshakeSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemHandshakeSelfTest.class.getName())));
 
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoop20FileSystemLoopbackPrimarySelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoop20FileSystemLoopbackPrimarySelfTest.class.getName())));
 
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopDualSyncSelfTest.class.getName())));
-        suite.addTest(new TestSuite(ldr.loadClass(GridGgfsHadoopDualAsyncSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopDualSyncSelfTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopDualAsyncSelfTest.class.getName())));
 
-        suite.addTest(IgniteFsEventsTestSuite.suiteNoarchOnly());
+        suite.addTest(IgfsEventsTestSuite.suiteNoarchOnly());
 
         suite.addTest(new TestSuite(ldr.loadClass(GridHadoopFileSystemsTest.class.getName())));
 


Mime
View raw message