ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [09/47] ignite git commit: IGNITE-3912: Hadoop: Implemented new class loading architecture for embedded execution mode.
Date Mon, 26 Sep 2016 11:24:50 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolEmbeddedSelfTest.java
new file mode 100644
index 0000000..a65d691
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolEmbeddedSelfTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.client;
+
+import org.apache.ignite.configuration.HadoopConfiguration;
+
+/**
+ * Hadoop client protocol tests in embedded process mode.
+ */
+public class HadoopClientProtocolEmbeddedSelfTest extends HadoopClientProtocolSelfTest {
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
new file mode 100644
index 0000000..1ef7dd0
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.client;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.StringTokenizer;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Hadoop client protocol tests in external process mode.
+ */
+@SuppressWarnings("ResultOfMethodCallIgnored")
+public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
+    /** Input path. */
+    private static final String PATH_INPUT = "/input";
+
+    /** Output path. */
+    private static final String PATH_OUTPUT = "/output";
+
+    /** Job name. */
+    private static final String JOB_NAME = "myJob";
+
+    /** Setup lock file. */
+    private static File setupLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
+        "ignite-lock-setup.file");
+
+    /** Map lock file. */
+    private static File mapLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
+        "ignite-lock-map.file");
+
+    /** Reduce lock file. */
+    private static File reduceLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp",
+        "ignite-lock-reduce.file");
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean restEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(gridCount());
+
+        setupLockFile.delete();
+        mapLockFile.delete();
+        reduceLockFile.delete();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+//        IgniteHadoopClientProtocolProvider.cliMap.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        setupLockFile.createNewFile();
+        mapLockFile.createNewFile();
+        reduceLockFile.createNewFile();
+
+        setupLockFile.deleteOnExit();
+        mapLockFile.deleteOnExit();
+        reduceLockFile.deleteOnExit();
+
+        super.beforeTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        grid(0).fileSystem(HadoopAbstractSelfTest.igfsName).format();
+
+        setupLockFile.delete();
+        mapLockFile.delete();
+        reduceLockFile.delete();
+
+        super.afterTest();
+    }
+
+    /**
+     * Test next job ID generation.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private void tstNextJobId() throws Exception {
+        IgniteHadoopClientProtocolProvider provider = provider();
+
+        ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT));
+
+        JobID jobId = proto.getNewJobID();
+
+        assert jobId != null;
+        assert jobId.getJtIdentifier() != null;
+
+        JobID nextJobId = proto.getNewJobID();
+
+        assert nextJobId != null;
+        assert nextJobId.getJtIdentifier() != null;
+
+        assert !F.eq(jobId, nextJobId);
+    }
+
+    /**
+     * Tests job counters retrieval.
+     *
+     * @throws Exception If failed.
+     */
+    public void testJobCounters() throws Exception {
+        IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName);
+
+        igfs.mkdirs(new IgfsPath(PATH_INPUT));
+
+        try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
+            new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
+
+            bw.write(
+                "alpha\n" +
+                "beta\n" +
+                "gamma\n" +
+                "alpha\n" +
+                "beta\n" +
+                "gamma\n" +
+                "alpha\n" +
+                "beta\n" +
+                "gamma\n"
+            );
+        }
+
+        Configuration conf = config(HadoopAbstractSelfTest.REST_PORT);
+
+        final Job job = Job.getInstance(conf);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(TestCountingMapper.class);
+        job.setReducerClass(TestCountingReducer.class);
+        job.setCombinerClass(TestCountingCombiner.class);
+
+        FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
+        FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
+
+        job.submit();
+
+        final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1);
+
+        assertEquals(0, cntr.getValue());
+
+        cntr.increment(10);
+
+        assertEquals(10, cntr.getValue());
+
+        // Transferring to map phase.
+        setupLockFile.delete();
+
+        // Transferring to reduce phase.
+        mapLockFile.delete();
+
+        job.waitForCompletion(false);
+
+        assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState());
+
+        final Counters counters = job.getCounters();
+
+        assertNotNull("counters cannot be null", counters);
+        assertEquals("wrong counters count", 3, counters.countCounters());
+        assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue());
+        assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue());
+        assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue());
+    }
+
+    /**
+     * Tests job counters retrieval for unknown job id.
+     *
+     * @throws Exception If failed.
+     */
+    private void tstUnknownJobCounters() throws Exception {
+        IgniteHadoopClientProtocolProvider provider = provider();
+
+        ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT));
+
+        try {
+            proto.getJobCounters(new JobID(UUID.randomUUID().toString(), -1));
+            fail("exception must be thrown");
+        }
+        catch (Exception e) {
+            assert e instanceof IOException : "wrong error has been thrown";
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void tstJobSubmitMap() throws Exception {
+        checkJobSubmit(true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void tstJobSubmitMapCombine() throws Exception {
+        checkJobSubmit(false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void tstJobSubmitMapReduce() throws Exception {
+        checkJobSubmit(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void tstJobSubmitMapCombineReduce() throws Exception {
+        checkJobSubmit(false, false);
+    }
+
+    /**
+     * Test job submission.
+     *
+     * @param noCombiners Whether there are no combiners.
+     * @param noReducers Whether there are no reducers.
+     * @throws Exception If failed.
+     */
+    public void checkJobSubmit(boolean noCombiners, boolean noReducers) throws Exception {
+        IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName);
+
+        igfs.mkdirs(new IgfsPath(PATH_INPUT));
+
+        try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
+            new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
+
+            bw.write("word");
+        }
+
+        Configuration conf = config(HadoopAbstractSelfTest.REST_PORT);
+
+        final Job job = Job.getInstance(conf);
+
+        job.setJobName(JOB_NAME);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(TestMapper.class);
+        job.setReducerClass(TestReducer.class);
+
+        if (!noCombiners)
+            job.setCombinerClass(TestCombiner.class);
+
+        if (noReducers)
+            job.setNumReduceTasks(0);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        job.setOutputFormatClass(TestOutputFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path(PATH_INPUT));
+        FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT));
+
+        job.submit();
+
+        JobID jobId = job.getJobID();
+
+        // Setup phase.
+        JobStatus jobStatus = job.getStatus();
+        checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+        assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f;
+        assert jobStatus.getMapProgress() == 0.0f;
+        assert jobStatus.getReduceProgress() == 0.0f;
+
+        U.sleep(2100);
+
+        JobStatus recentJobStatus = job.getStatus();
+
+        assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() :
+            "Old=" + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress();
+
+        // Transferring to map phase.
+        setupLockFile.delete();
+
+        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    return F.eq(1.0f, job.getStatus().getSetupProgress());
+                }
+                catch (Exception e) {
+                    throw new RuntimeException("Unexpected exception.", e);
+                }
+            }
+        }, 5000L);
+
+        // Map phase.
+        jobStatus = job.getStatus();
+        checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+        assert jobStatus.getSetupProgress() == 1.0f;
+        assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f;
+        assert jobStatus.getReduceProgress() == 0.0f;
+
+        U.sleep(2100);
+
+        recentJobStatus = job.getStatus();
+
+        assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() :
+            "Old=" + jobStatus.getMapProgress() + ", new=" + recentJobStatus.getMapProgress();
+
+        // Transferring to reduce phase.
+        mapLockFile.delete();
+
+        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    return F.eq(1.0f, job.getStatus().getMapProgress());
+                }
+                catch (Exception e) {
+                    throw new RuntimeException("Unexpected exception.", e);
+                }
+            }
+        }, 5000L);
+
+        if (!noReducers) {
+            // Reduce phase.
+            jobStatus = job.getStatus();
+            checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f);
+            assert jobStatus.getSetupProgress() == 1.0f;
+            assert jobStatus.getMapProgress() == 1.0f;
+            assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f;
+
+            // Ensure that reduces progress increases.
+            U.sleep(2100);
+
+            recentJobStatus = job.getStatus();
+
+            assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() :
+                "Old=" + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress();
+
+            reduceLockFile.delete();
+        }
+
+        job.waitForCompletion(false);
+
+        jobStatus = job.getStatus();
+        checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f);
+        assert jobStatus.getSetupProgress() == 1.0f;
+        assert jobStatus.getMapProgress() == 1.0f;
+        assert jobStatus.getReduceProgress() == 1.0f;
+
+        dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT));
+    }
+
+    /**
+     * Dump IGFS content.
+     *
+     * @param igfs IGFS.
+     * @param path Path.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private static void dumpIgfs(IgniteFileSystem igfs, IgfsPath path) throws Exception {
+        IgfsFile file = igfs.info(path);
+
+        assert file != null;
+
+        System.out.println(file.path());
+
+        if (file.isDirectory()) {
+            for (IgfsPath child : igfs.listPaths(path))
+                dumpIgfs(igfs, child);
+        }
+        else {
+            try (BufferedReader br = new BufferedReader(new InputStreamReader(igfs.open(path)))) {
+                String line = br.readLine();
+
+                while (line != null) {
+                    System.out.println(line);
+
+                    line = br.readLine();
+                }
+            }
+        }
+    }
+
+    /**
+     * Check job status.
+     *
+     * @param status Job status.
+     * @param expJobId Expected job ID.
+     * @param expJobName Expected job name.
+     * @param expState Expected state.
+     * @param expCleanupProgress Expected cleanup progress.
+     * @throws Exception If failed.
+     */
+    private static void checkJobStatus(JobStatus status, JobID expJobId, String expJobName,
+        JobStatus.State expState, float expCleanupProgress) throws Exception {
+        assert F.eq(status.getJobID(), expJobId) : "Expected=" + expJobId + ", actual=" + status.getJobID();
+        assert F.eq(status.getJobName(), expJobName) : "Expected=" + expJobName + ", actual=" + status.getJobName();
+        assert F.eq(status.getState(), expState) : "Expected=" + expState + ", actual=" + status.getState();
+        assert F.eq(status.getCleanupProgress(), expCleanupProgress) :
+            "Expected=" + expCleanupProgress + ", actual=" + status.getCleanupProgress();
+    }
+
+    /**
+     * @return Configuration.
+     */
+    private Configuration config(int port) {
+        Configuration conf = HadoopUtils.safeCreateConfiguration();
+
+        setupFileSystems(conf);
+
+        conf.set(MRConfig.FRAMEWORK_NAME, IgniteHadoopClientProtocolProvider.FRAMEWORK_NAME);
+        conf.set(MRConfig.MASTER_ADDRESS, "127.0.0.1:" + port);
+
+        conf.set("fs.defaultFS", "igfs://:" + getTestGridName(0) + "@/");
+
+        return conf;
+    }
+
+    /**
+     * @return Protocol provider.
+     */
+    private IgniteHadoopClientProtocolProvider provider() {
+        return new IgniteHadoopClientProtocolProvider();
+    }
+
+    /**
+     * Test mapper.
+     */
+    public static class TestMapper extends Mapper<Object, Text, Text, IntWritable> {
+        /** Writable container for writing word. */
+        private Text word = new Text();
+
+        /** Writable integer constant of '1' is writing as count of found words. */
+        private static final IntWritable one = new IntWritable(1);
+
+        /** {@inheritDoc} */
+        @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+            while (mapLockFile.exists())
+                Thread.sleep(50);
+
+            StringTokenizer wordList = new StringTokenizer(val.toString());
+
+            while (wordList.hasMoreTokens()) {
+                word.set(wordList.nextToken());
+
+                ctx.write(word, one);
+            }
+        }
+    }
+
+    /**
+     * Test Hadoop counters.
+     */
+    public enum TestCounter {
+        COUNTER1, COUNTER2, COUNTER3
+    }
+
+    /**
+     * Test mapper that uses counters.
+     */
+    public static class TestCountingMapper extends TestMapper {
+        /** {@inheritDoc} */
+        @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+            super.map(key, val, ctx);
+            ctx.getCounter(TestCounter.COUNTER1).increment(1);
+        }
+    }
+
+    /**
+     * Test combiner that counts invocations.
+     */
+    public static class TestCountingCombiner extends TestReducer {
+        @Override public void reduce(Text key, Iterable<IntWritable> values,
+            Context ctx) throws IOException, InterruptedException {
+            ctx.getCounter(TestCounter.COUNTER1).increment(1);
+            ctx.getCounter(TestCounter.COUNTER2).increment(1);
+
+            int sum = 0;
+            for (IntWritable value : values)
+                sum += value.get();
+
+            ctx.write(key, new IntWritable(sum));
+        }
+    }
+
+    /**
+     * Test reducer that counts invocations.
+     */
+    public static class TestCountingReducer extends TestReducer {
+        @Override public void reduce(Text key, Iterable<IntWritable> values,
+            Context ctx) throws IOException, InterruptedException {
+            ctx.getCounter(TestCounter.COUNTER1).increment(1);
+            ctx.getCounter(TestCounter.COUNTER3).increment(1);
+        }
+    }
+
+    /**
+     * Test combiner.
+     */
+    public static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
+        // No-op.
+    }
+
+    public static class TestOutputFormat<K, V> extends TextOutputFormat<K, V> {
+        /** {@inheritDoc} */
+        @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext ctx)
+            throws IOException {
+            return new TestOutputCommitter(ctx, (FileOutputCommitter)super.getOutputCommitter(ctx));
+        }
+    }
+
+    /**
+     * Test output committer.
+     */
+    private static class TestOutputCommitter extends FileOutputCommitter {
+        /** Delegate. */
+        private final FileOutputCommitter delegate;
+
+        /**
+         * Constructor.
+         *
+         * @param ctx Task attempt context.
+         * @param delegate Delegate.
+         * @throws IOException If failed.
+         */
+        private TestOutputCommitter(TaskAttemptContext ctx, FileOutputCommitter delegate) throws IOException {
+            super(FileOutputFormat.getOutputPath(ctx), ctx);
+
+            this.delegate = delegate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setupJob(JobContext jobCtx) throws IOException {
+            try {
+                while (setupLockFile.exists())
+                    Thread.sleep(50);
+            }
+            catch (InterruptedException ignored) {
+                throw new IOException("Interrupted.");
+            }
+
+            delegate.setupJob(jobCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setupTask(TaskAttemptContext taskCtx) throws IOException {
+            delegate.setupTask(taskCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean needsTaskCommit(TaskAttemptContext taskCtx) throws IOException {
+            return delegate.needsTaskCommit(taskCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void commitTask(TaskAttemptContext taskCtx) throws IOException {
+            delegate.commitTask(taskCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void abortTask(TaskAttemptContext taskCtx) throws IOException {
+            delegate.abortTask(taskCtx);
+        }
+    }
+
+    /**
+     * Test reducer.
+     */
+    public static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+        /** Writable container for writing sum of word counts. */
+        private IntWritable totalWordCnt = new IntWritable();
+
+        /** {@inheritDoc} */
+        @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
+            InterruptedException {
+            while (reduceLockFile.exists())
+                Thread.sleep(50);
+
+            int wordCnt = 0;
+
+            for (IntWritable value : values)
+                wordCnt += value.get();
+
+            totalWordCnt.set(wordCnt);
+
+            ctx.write(key, totalWordCnt);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1.java
new file mode 100644
index 0000000..0df9c6a
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.examples;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+/**
+ * Example job for testing hadoop task execution.
+ */
+public class HadoopWordCount1 {
+    /**
+     * Entry point to start job.
+     * @param args command line parameters.
+     * @throws Exception if fails.
+     */
+    public static void main(String[] args) throws Exception {
+        if (args.length != 2) {
+            System.out.println("usage: [input] [output]");
+            System.exit(-1);
+        }
+
+        JobConf job = getJob(args[0], args[1]);
+
+        JobClient.runJob(job);
+    }
+
+    /**
+     * Gets fully configured JobConf instance.
+     *
+     * @param input input file name.
+     * @param output output directory name.
+     * @return Job configuration
+     */
+    public static JobConf getJob(String input, String output) {
+        JobConf conf = new JobConf(HadoopWordCount1.class);
+        conf.setJobName("wordcount");
+
+        conf.setOutputKeyClass(Text.class);
+        conf.setOutputValueClass(IntWritable.class);
+
+        setTasksClasses(conf, true, true, true);
+
+        FileInputFormat.setInputPaths(conf, new Path(input));
+        FileOutputFormat.setOutputPath(conf, new Path(output));
+
+        return conf;
+    }
+
+    /**
+     * Sets task classes with related info if needed into configuration object.
+     *
+     * @param jobConf Configuration to change.
+     * @param setMapper Option to set mapper and input format classes.
+     * @param setCombiner Option to set combiner class.
+     * @param setReducer Option to set reducer and output format classes.
+     */
+    public static void setTasksClasses(JobConf jobConf, boolean setMapper, boolean setCombiner, boolean setReducer) {
+        if (setMapper) {
+            jobConf.setMapperClass(HadoopWordCount1Map.class);
+            jobConf.setInputFormat(TextInputFormat.class);
+        }
+
+        if (setCombiner)
+            jobConf.setCombinerClass(HadoopWordCount1Reduce.class);
+
+        if (setReducer) {
+            jobConf.setReducerClass(HadoopWordCount1Reduce.class);
+            jobConf.setOutputFormat(TextOutputFormat.class);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Map.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Map.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Map.java
new file mode 100644
index 0000000..6a98a24
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Map.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.examples;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopErrorSimulator;
+
+/**
+ * Mapper phase of WordCount job.
+ */
+public class HadoopWordCount1Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
+    /** Writable integer constant of '1' is writing as count of found words. */
+    private static final IntWritable one = new IntWritable(1);
+
+    /** Writable container for writing word. */
+    private Text word = new Text();
+
+    /** Flag is to check that mapper was configured before run. */
+    private boolean wasConfigured;
+
+    /** {@inheritDoc} */
+    @Override public void map(LongWritable key, Text val, OutputCollector<Text, IntWritable> output, Reporter reporter)
+            throws IOException {
+
+        assert wasConfigured : "Mapper should be configured";
+
+        String line = val.toString();
+
+        StringTokenizer tokenizer = new StringTokenizer(line);
+
+        while (tokenizer.hasMoreTokens()) {
+            word.set(tokenizer.nextToken());
+
+            output.collect(word, one);
+        }
+
+        HadoopErrorSimulator.instance().onMap();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void configure(JobConf job) {
+        super.configure(job);
+
+        wasConfigured = true;
+
+        HadoopErrorSimulator.instance().onMapConfigure();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IOException {
+        super.close();
+
+        HadoopErrorSimulator.instance().onMapClose();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Reduce.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Reduce.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Reduce.java
new file mode 100644
index 0000000..ab91e0c
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount1Reduce.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.examples;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopErrorSimulator;
+
+/**
+ * Combiner and Reducer phase of WordCount job.
+ */
+public class HadoopWordCount1Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
+    /** Flag is to check that mapper was configured before run. */
+    private boolean wasConfigured;
+
+    /** {@inheritDoc} */
+    @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter)
+            throws IOException {
+        assert wasConfigured : "Reducer should be configured";
+
+        int sum = 0;
+
+        while (values.hasNext())
+            sum += values.next().get();
+
+        output.collect(key, new IntWritable(sum));
+
+        HadoopErrorSimulator.instance().onReduce();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void configure(JobConf job) {
+        super.configure(job);
+
+        wasConfigured = true;
+
+        HadoopErrorSimulator.instance().onReduceConfigure();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2.java
new file mode 100644
index 0000000..3ddc923
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.examples;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+/**
+ * Example job for testing hadoop task execution.
+ */
+public class HadoopWordCount2 {
+    /**
+     * Entry point to start job.
+     *
+     * @param args Command line parameters.
+     * @throws Exception If fails.
+     */
+    public static void main(String[] args) throws Exception {
+        if (args.length != 2) {
+            System.out.println("usage: [input] [output]");
+            System.exit(-1);
+        }
+
+        Job job = getJob(args[0], args[1]);
+
+        job.submit();
+    }
+
+    /**
+     * Gets fully configured Job instance.
+     *
+     * @param input Input file name.
+     * @param output Output directory name.
+     * @return Job instance.
+     * @throws IOException If fails.
+     */
+    public static Job getJob(String input, String output) throws IOException {
+        Job job = Job.getInstance();
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        setTasksClasses(job, true, true, true, false);
+
+        FileInputFormat.setInputPaths(job, new Path(input));
+        FileOutputFormat.setOutputPath(job, new Path(output));
+
+        job.setJarByClass(HadoopWordCount2.class);
+
+        return job;
+    }
+
+    /**
+     * Sets task classes with related info if needed into configuration object.
+     *
+     * @param job Configuration to change.
+     * @param setMapper Option to set mapper and input format classes.
+     * @param setCombiner Option to set combiner class.
+     * @param setReducer Option to set reducer and output format classes.
+     */
+    public static void setTasksClasses(Job job, boolean setMapper, boolean setCombiner, boolean setReducer,
+            boolean outputCompression) {
+        if (setMapper) {
+            job.setMapperClass(HadoopWordCount2Mapper.class);
+            job.setInputFormatClass(TextInputFormat.class);
+        }
+
+        if (setCombiner)
+            job.setCombinerClass(HadoopWordCount2Combiner.class);
+
+        if (setReducer) {
+            job.setReducerClass(HadoopWordCount2Reducer.class);
+            job.setOutputFormatClass(TextOutputFormat.class);
+        }
+
+        if (outputCompression) {
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+            SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
+
+            SequenceFileOutputFormat.setCompressOutput(job, true);
+
+            job.getConfiguration().set(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Combiner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Combiner.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Combiner.java
new file mode 100644
index 0000000..a643a92
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Combiner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop.impl.examples;
+
+import java.io.IOException;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopErrorSimulator;
+
+/**
+ * Combiner function with pluggable error simulator.
+ */
+public class HadoopWordCount2Combiner extends HadoopWordCount2Reducer {
+    /** {@inheritDoc} */
+    @Override protected void configError() {
+        HadoopErrorSimulator.instance().onCombineConfigure();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void setupError() throws IOException, InterruptedException {
+        HadoopErrorSimulator.instance().onCombineSetup();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void reduceError() throws IOException, InterruptedException {
+        HadoopErrorSimulator.instance().onCombine();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void cleanupError() throws IOException, InterruptedException {
+        HadoopErrorSimulator.instance().onCombineCleanup();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Mapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Mapper.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Mapper.java
new file mode 100644
index 0000000..336db84
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Mapper.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.examples;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopErrorSimulator;
+
+/**
+ * Mapper phase of WordCount job.
+ */
+public class HadoopWordCount2Mapper extends Mapper<Object, Text, Text, IntWritable> implements Configurable {
+    /** Writable container for writing word. */
+    private Text word = new Text();
+
+    /** Writable integer constant of '1' is writing as count of found words. */
+    private static final IntWritable one = new IntWritable(1);
+
+    /** Flag is to check that mapper was configured before run. */
+    private boolean wasConfigured;
+
+    /** Flag is to check that mapper was set up before run. */
+    private boolean wasSetUp;
+
+    /** {@inheritDoc} */
+    @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+        assert wasConfigured : "Mapper should be configured";
+        assert wasSetUp : "Mapper should be set up";
+
+        StringTokenizer wordList = new StringTokenizer(val.toString());
+
+        while (wordList.hasMoreTokens()) {
+            word.set(wordList.nextToken());
+
+            ctx.write(word, one);
+        }
+
+        HadoopErrorSimulator.instance().onMap();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+        super.setup(ctx);
+
+        wasSetUp = true;
+
+        HadoopErrorSimulator.instance().onMapSetup();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void cleanup(Context ctx) throws IOException, InterruptedException {
+        super.cleanup(ctx);
+
+        HadoopErrorSimulator.instance().onMapCleanup();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setConf(Configuration conf) {
+        wasConfigured = true;
+
+        HadoopErrorSimulator.instance().onMapConfigure();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Configuration getConf() {
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Reducer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Reducer.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Reducer.java
new file mode 100644
index 0000000..f24288e
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/examples/HadoopWordCount2Reducer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.examples;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopErrorSimulator;
+
+/**
+ * Combiner and Reducer phase of WordCount job.
+ */
+public class HadoopWordCount2Reducer extends Reducer<Text, IntWritable, Text, IntWritable> implements Configurable {
+    /** Writable container for writing sum of word counts. */
+    private IntWritable totalWordCnt = new IntWritable();
+
+    /** Flag is to check that mapper was configured before run. */
+    private boolean wasConfigured;
+
+    /** Flag is to check that mapper was set up before run. */
+    private boolean wasSetUp;
+
+    /** {@inheritDoc} */
+    @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, InterruptedException {
+        assert wasConfigured : "Reducer should be configured";
+        assert wasSetUp : "Reducer should be set up";
+
+        int wordCnt = 0;
+
+        for (IntWritable value : values)
+            wordCnt += value.get();
+
+        totalWordCnt.set(wordCnt);
+
+        ctx.write(key, totalWordCnt);
+
+        reduceError();
+    }
+
+    /**
+     * Simulates reduce error if needed.
+     */
+    protected void reduceError() throws IOException, InterruptedException {
+        HadoopErrorSimulator.instance().onReduce();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void setup(Context context) throws IOException, InterruptedException {
+        super.setup(context);
+
+        wasSetUp = true;
+
+        setupError();
+    }
+
+    /**
+     * Simulates setup error if needed.
+     */
+    protected void setupError() throws IOException, InterruptedException {
+        HadoopErrorSimulator.instance().onReduceSetup();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void cleanup(Context context) throws IOException, InterruptedException {
+        super.cleanup(context);
+
+        cleanupError();
+    }
+
+    /**
+     * Simulates cleanup error if needed.
+     */
+    protected void cleanupError() throws IOException, InterruptedException {
+        HadoopErrorSimulator.instance().onReduceCleanup();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setConf(Configuration conf) {
+        wasConfigured = true;
+
+        configError();
+    }
+
+    /**
+     * Simulates configuration error if needed.
+     */
+    protected void configError() {
+        HadoopErrorSimulator.instance().onReduceConfigure();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Configuration getConf() {
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/fs/KerberosHadoopFileSystemFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/fs/KerberosHadoopFileSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/fs/KerberosHadoopFileSystemFactorySelfTest.java
new file mode 100644
index 0000000..8c95a0e
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/fs/KerberosHadoopFileSystemFactorySelfTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.fs;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.concurrent.Callable;
+
+import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactory;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+/**
+ * Tests KerberosHadoopFileSystemFactory.
+ */
+public class KerberosHadoopFileSystemFactorySelfTest extends GridCommonAbstractTest {
+    /**
+     * Test parameters validation.
+     *
+     * @throws Exception If failed.
+     */
+    public void testParameters() throws Exception {
+        checkParameters(null, null, -1);
+
+        checkParameters(null, null, 100);
+        checkParameters(null, "b", -1);
+        checkParameters("a", null, -1);
+
+        checkParameters(null, "b", 100);
+        checkParameters("a", null, 100);
+        checkParameters("a", "b", -1);
+    }
+
+    /**
+     * Check parameters.
+     *
+     * @param keyTab Key tab.
+     * @param keyTabPrincipal Key tab principal.
+     * @param reloginInterval Re-login interval.
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    private void checkParameters(String keyTab, String keyTabPrincipal, long reloginInterval) {
+        final KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory();
+
+        fac.setKeyTab(keyTab);
+        fac.setKeyTabPrincipal(keyTabPrincipal);
+        fac.setReloginInterval(reloginInterval);
+
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                HadoopFileSystemFactoryDelegate delegate = HadoopDelegateUtils.fileSystemFactoryDelegate(fac);
+
+                delegate.start();
+
+                return null;
+            }
+        }, IllegalArgumentException.class, null);
+    }
+
+    /**
+     * Checks serializatuion and deserialization of the secure factory.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSerialization() throws Exception {
+        KerberosHadoopFileSystemFactory fac = new KerberosHadoopFileSystemFactory();
+
+        checkSerialization(fac);
+
+        fac = new KerberosHadoopFileSystemFactory();
+
+        fac.setUri("igfs://igfs@localhost:10500/");
+        fac.setConfigPaths("/a/core-sute.xml", "/b/mapred-site.xml");
+        fac.setKeyTabPrincipal("foo");
+        fac.setKeyTab("/etc/krb5.keytab");
+        fac.setReloginInterval(30 * 60 * 1000L);
+
+        checkSerialization(fac);
+    }
+
+    /**
+     * Serializes the factory,
+     *
+     * @param fac The facory to check.
+     * @throws Exception If failed.
+     */
+    private void checkSerialization(KerberosHadoopFileSystemFactory fac) throws Exception {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        ObjectOutput oo = new ObjectOutputStream(baos);
+
+        oo.writeObject(fac);
+
+        ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+
+        KerberosHadoopFileSystemFactory fac2 = (KerberosHadoopFileSystemFactory)in.readObject();
+
+        assertEquals(fac.getUri(), fac2.getUri());
+        Assert.assertArrayEquals(fac.getConfigPaths(), fac2.getConfigPaths());
+        assertEquals(fac.getKeyTab(), fac2.getKeyTab());
+        assertEquals(fac.getKeyTabPrincipal(), fac2.getKeyTabPrincipal());
+        assertEquals(fac.getReloginInterval(), fac2.getReloginInterval());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1DualAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1DualAbstractTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1DualAbstractTest.java
new file mode 100644
index 0000000..a585e54
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1DualAbstractTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.hadoop.util.ChainedUserNameMapper;
+import org.apache.ignite.hadoop.util.KerberosUserNameMapper;
+import org.apache.ignite.hadoop.util.UserNameMapper;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
+import org.apache.ignite.igfs.IgfsIpcEndpointType;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.internal.processors.igfs.IgfsDualAbstractSelfTest;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME;
+import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
+
+/**
+ * Abstract test for Hadoop 1.0 file system stack.
+ */
+public abstract class Hadoop1DualAbstractTest extends IgfsDualAbstractSelfTest {
+    /** Secondary grid name */
+    private static final String GRID_NAME = "grid_secondary";
+
+    /** Secondary file system name */
+    private static final String IGFS_NAME = "igfs_secondary";
+
+    /** Secondary file system REST endpoint port */
+    private static final int PORT = 11500;
+
+    /** Secondary file system REST endpoint configuration map. */
+    private static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration() {{
+        setType(IgfsIpcEndpointType.TCP);
+        setPort(PORT);
+    }};
+
+    /** Secondary file system authority. */
+    private static final String SECONDARY_AUTHORITY = IGFS_NAME + ":" + GRID_NAME + "@127.0.0.1:" + PORT;
+
+    /** Secondary Fs configuration full path. */
+    protected String secondaryConfFullPath;
+
+    /** Secondary Fs URI. */
+    protected String secondaryUri;
+
+    /** Constructor. */
+    public Hadoop1DualAbstractTest(IgfsMode mode) {
+        super(mode);
+    }
+
+    /**
+     * Creates secondary filesystems.
+     * @return IgfsSecondaryFileSystem
+     * @throws Exception On failure.
+     */
+    @Override protected IgfsSecondaryFileSystem createSecondaryFileSystemStack() throws Exception {
+        startUnderlying();
+
+        prepareConfiguration();
+
+        KerberosUserNameMapper mapper1 = new KerberosUserNameMapper();
+
+        mapper1.setRealm("TEST.COM");
+
+        TestUserNameMapper mapper2 = new TestUserNameMapper();
+
+        ChainedUserNameMapper mapper = new ChainedUserNameMapper();
+
+        mapper.setMappers(mapper1, mapper2);
+
+        CachingHadoopFileSystemFactory factory = new CachingHadoopFileSystemFactory();
+
+        factory.setUri(secondaryUri);
+        factory.setConfigPaths(secondaryConfFullPath);
+        factory.setUserNameMapper(mapper);
+
+        IgniteHadoopIgfsSecondaryFileSystem second = new IgniteHadoopIgfsSecondaryFileSystem();
+
+        second.setFileSystemFactory(factory);
+
+        igfsSecondary = new HadoopIgfsSecondaryFileSystemTestAdapter(factory);
+
+        return second;
+    }
+
+    /**
+     * Starts underlying Ignite process.
+     * @throws IOException On failure.
+     */
+    protected void startUnderlying() throws Exception {
+        startGridWithIgfs(GRID_NAME, IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG, secondaryIpFinder);
+    }
+
+    /**
+     * Prepares Fs configuration.
+     * @throws IOException On failure.
+     */
+    protected void prepareConfiguration() throws IOException {
+        Configuration secondaryConf = HadoopSecondaryFileSystemConfigurationTest.configuration(IGFS_SCHEME, SECONDARY_AUTHORITY, true, true);
+
+        secondaryConf.setInt("fs.igfs.block.size", 1024);
+
+        secondaryConfFullPath = HadoopSecondaryFileSystemConfigurationTest.writeConfiguration(secondaryConf, HadoopSecondaryFileSystemConfigurationTest.SECONDARY_CFG_PATH);
+
+        secondaryUri = HadoopSecondaryFileSystemConfigurationTest.mkUri(IGFS_SCHEME, SECONDARY_AUTHORITY);
+    }
+
+    /**
+     * Test user name mapper.
+     */
+    private static class TestUserNameMapper implements UserNameMapper, LifecycleAware {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Started flag. */
+        private boolean started;
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String map(String name) {
+            assert started;
+            assert name != null && name.contains("@");
+
+            return name.substring(0, name.indexOf("@"));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void start() throws IgniteException {
+            started = true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stop() throws IgniteException {
+            // No-op.
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualAsyncTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualAsyncTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualAsyncTest.java
new file mode 100644
index 0000000..97cc7e9
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualAsyncTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.ignite.igfs.IgfsMode;
+
+/**
+ * DUAL_ASYNC mode test.
+ */
+public class Hadoop1OverIgfsDualAsyncTest extends Hadoop1DualAbstractTest {
+    /**
+     * Constructor.
+     */
+    public Hadoop1OverIgfsDualAsyncTest() {
+        super(IgfsMode.DUAL_ASYNC);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualSyncTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualSyncTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualSyncTest.java
new file mode 100644
index 0000000..12036bc
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsDualSyncTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.ignite.igfs.IgfsMode;
+
+/**
+ * DUAL_SYNC mode.
+ */
+public class Hadoop1OverIgfsDualSyncTest extends Hadoop1DualAbstractTest {
+    /**
+     * Constructor.
+     */
+    public Hadoop1OverIgfsDualSyncTest() {
+        super(IgfsMode.DUAL_SYNC);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java
new file mode 100644
index 0000000..7cf7f2d
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopFIleSystemFactorySelfTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
+import org.apache.ignite.igfs.IgfsIpcEndpointType;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
+import org.apache.ignite.internal.processors.igfs.IgfsCommonAbstractTest;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ * Tests for Hadoop file system factory.
+ */
+public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest {
+    /** Amount of "start" invocations */
+    private static final AtomicInteger START_CNT = new AtomicInteger();
+
+    /** Amount of "stop" invocations */
+    private static final AtomicInteger STOP_CNT = new AtomicInteger();
+
+    /** Path to secondary file system configuration. */
+    private static final String SECONDARY_CFG_PATH = "/work/core-site-HadoopFIleSystemFactorySelfTest.xml";
+
+    /** IGFS path for DUAL mode. */
+    private static final Path PATH_DUAL = new Path("/ignite/sync/test_dir");
+
+    /** IGFS path for PROXY mode. */
+    private static final Path PATH_PROXY = new Path("/ignite/proxy/test_dir");
+
+    /** IGFS path for DUAL mode. */
+    private static final IgfsPath IGFS_PATH_DUAL = new IgfsPath("/ignite/sync/test_dir");
+
+    /** IGFS path for PROXY mode. */
+    private static final IgfsPath IGFS_PATH_PROXY = new IgfsPath("/ignite/proxy/test_dir");
+
+    /** Secondary IGFS. */
+    private IgfsEx secondary;
+
+    /** Primary IGFS. */
+    private IgfsEx primary;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        START_CNT.set(0);
+        STOP_CNT.set(0);
+
+        secondary = startSecondary();
+        primary = startPrimary();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        secondary = null;
+        primary = null;
+
+        stopAllGrids();
+    }
+
+    /**
+     * Test custom factory.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    public void testCustomFactory() throws Exception {
+        assert START_CNT.get() == 1;
+        assert STOP_CNT.get() == 0;
+
+        // Use IGFS directly.
+        primary.mkdirs(IGFS_PATH_DUAL);
+
+        assert primary.exists(IGFS_PATH_DUAL);
+        assert secondary.exists(IGFS_PATH_DUAL);
+
+        // Create remote instance.
+        FileSystem fs = FileSystem.get(URI.create("igfs://primary:primary@127.0.0.1:10500/"), baseConfiguration());
+
+        // Ensure lifecycle callback was invoked.
+        assert START_CNT.get() == 2;
+        assert STOP_CNT.get() == 0;
+
+        // Check file system operations.
+        assert fs.exists(PATH_DUAL);
+
+        assert fs.delete(PATH_DUAL, true);
+        assert !primary.exists(IGFS_PATH_DUAL);
+        assert !secondary.exists(IGFS_PATH_DUAL);
+        assert !fs.exists(PATH_DUAL);
+
+        assert fs.mkdirs(PATH_DUAL);
+        assert primary.exists(IGFS_PATH_DUAL);
+        assert secondary.exists(IGFS_PATH_DUAL);
+        assert fs.exists(PATH_DUAL);
+
+        assert fs.mkdirs(PATH_PROXY);
+        assert secondary.exists(IGFS_PATH_PROXY);
+        assert fs.exists(PATH_PROXY);
+
+        // Close file system and ensure that associated factory was notified.
+        fs.close();
+
+        assert START_CNT.get() == 2;
+        assert STOP_CNT.get() == 1;
+
+        // Stop primary node and ensure that base factory was notified.
+        G.stop(primary.context().kernalContext().grid().name(), true);
+
+        assert START_CNT.get() == 2;
+        assert STOP_CNT.get() == 2;
+    }
+
+    /**
+     * Start secondary IGFS.
+     *
+     * @return IGFS.
+     * @throws Exception If failed.
+     */
+    private static IgfsEx startSecondary() throws Exception {
+        return start("secondary", 11500, IgfsMode.PRIMARY, null);
+    }
+
+    /**
+     * Start primary IGFS.
+     *
+     * @return IGFS.
+     * @throws Exception If failed.
+     */
+    private static IgfsEx startPrimary() throws Exception {
+        // Prepare configuration.
+        Configuration conf = baseConfiguration();
+
+        conf.set("fs.defaultFS", "igfs://secondary:secondary@127.0.0.1:11500/");
+
+        writeConfigurationToFile(conf);
+
+        // Get file system instance to be used.
+        CachingHadoopFileSystemFactory delegate = new CachingHadoopFileSystemFactory();
+
+        delegate.setUri("igfs://secondary:secondary@127.0.0.1:11500/");
+        delegate.setConfigPaths(SECONDARY_CFG_PATH);
+
+        // Configure factory.
+        TestFactory factory = new TestFactory(delegate);
+
+        // Configure file system.
+        IgniteHadoopIgfsSecondaryFileSystem secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem();
+
+        secondaryFs.setFileSystemFactory(factory);
+
+        // Start.
+        return start("primary", 10500, IgfsMode.DUAL_ASYNC, secondaryFs);
+    }
+
+    /**
+     * Start Ignite node with IGFS instance.
+     *
+     * @param name Node and IGFS name.
+     * @param endpointPort Endpoint port.
+     * @param dfltMode Default path mode.
+     * @param secondaryFs Secondary file system.
+     * @return Igfs instance.
+     */
+    private static IgfsEx start(String name, int endpointPort, IgfsMode dfltMode,
+        @Nullable IgfsSecondaryFileSystem secondaryFs) {
+        IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
+
+        endpointCfg.setType(IgfsIpcEndpointType.TCP);
+        endpointCfg.setHost("127.0.0.1");
+        endpointCfg.setPort(endpointPort);
+
+        FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+        igfsCfg.setDataCacheName("dataCache");
+        igfsCfg.setMetaCacheName("metaCache");
+        igfsCfg.setName(name);
+        igfsCfg.setDefaultMode(dfltMode);
+        igfsCfg.setIpcEndpointConfiguration(endpointCfg);
+        igfsCfg.setSecondaryFileSystem(secondaryFs);
+        igfsCfg.setInitializeDefaultPathModes(true);
+
+        CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+        dataCacheCfg.setName("dataCache");
+        dataCacheCfg.setCacheMode(PARTITIONED);
+        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+        dataCacheCfg.setBackups(0);
+        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+        dataCacheCfg.setOffHeapMaxMemory(0);
+
+        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+        metaCacheCfg.setName("metaCache");
+        metaCacheCfg.setCacheMode(REPLICATED);
+        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setGridName(name);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        cfg.setDiscoverySpi(discoSpi);
+        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+        cfg.setFileSystemConfiguration(igfsCfg);
+
+        cfg.setLocalHost("127.0.0.1");
+        cfg.setConnectorConfiguration(null);
+
+        return (IgfsEx)G.start(cfg).fileSystem(name);
+    }
+
+    /**
+     * Create base FileSystem configuration.
+     *
+     * @return Configuration.
+     */
+    private static Configuration baseConfiguration() {
+        Configuration conf = new Configuration();
+
+        conf.set("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+        return conf;
+    }
+
+    /**
+     * Write configuration to file.
+     *
+     * @param conf Configuration.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    private static void writeConfigurationToFile(Configuration conf) throws Exception {
+        final String path = U.getIgniteHome() + SECONDARY_CFG_PATH;
+
+        File file = new File(path);
+
+        file.delete();
+
+        assertFalse(file.exists());
+
+        try (FileOutputStream fos = new FileOutputStream(file)) {
+            conf.writeXml(fos);
+        }
+
+        assertTrue(file.exists());
+    }
+
+    /**
+     * Test factory.
+     */
+    private static class TestFactory implements HadoopFileSystemFactory, LifecycleAware {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** File system factory. */
+        private CachingHadoopFileSystemFactory factory;
+
+        /** File system. */
+        private transient HadoopFileSystemFactoryDelegate delegate;
+
+        /**
+         * Constructor.
+         *
+         * @param factory File system factory.
+         */
+        public TestFactory(CachingHadoopFileSystemFactory factory) {
+            this.factory = factory;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object get(String usrName) throws IOException {
+            return delegate.get(usrName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void start() throws IgniteException {
+            delegate = HadoopDelegateUtils.fileSystemFactoryDelegate(factory);
+
+            delegate.start();
+
+            START_CNT.incrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stop() throws IgniteException {
+            STOP_CNT.incrementAndGet();
+        }
+    }
+}


Mime
View raw message