ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [10/15] ignite git commit: IGNITE-2354: Hadoop: Added tests for errors thrown on different MR stages. This closes #622.
Date Wed, 13 Apr 2016 14:07:13 GMT
IGNITE-2354: Hadoop: Added tests for errors thrown on different MR stages. This closes #622.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d9f4f6e8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d9f4f6e8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d9f4f6e8

Branch: refs/heads/ignite-1786
Commit: d9f4f6e8dfdab37a730651b8850ebb976d4b1d72
Parents: 34fc271
Author: iveselovskiy <iveselovskiy@gridgain.com>
Authored: Tue Apr 12 16:15:37 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Apr 12 16:15:37 2016 +0300

----------------------------------------------------------------------
 .../hadoop/HadoopAbstractMapReduceTest.java     | 405 +++++++++++++++++++
 .../processors/hadoop/HadoopErrorSimulator.java | 326 +++++++++++++++
 .../HadoopMapReduceErrorResilienceTest.java     | 154 +++++++
 .../processors/hadoop/HadoopMapReduceTest.java  | 380 +----------------
 .../hadoop/HadoopSnappyFullMapReduceTest.java   |   8 +
 .../hadoop/examples/HadoopWordCount1Map.java    |  12 +
 .../hadoop/examples/HadoopWordCount1Reduce.java |   5 +
 .../hadoop/examples/HadoopWordCount2.java       |   2 +-
 .../examples/HadoopWordCount2Combiner.java      |  45 +++
 .../hadoop/examples/HadoopWordCount2Mapper.java |  19 +-
 .../examples/HadoopWordCount2Reducer.java       |  43 +-
 .../testsuites/IgniteHadoopTestSuite.java       |   8 +-
 12 files changed, 1038 insertions(+), 369 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java
new file mode 100644
index 0000000..d09ec61
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractMapReduceTest.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsUserContext;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
+import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1;
+import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_COUNTER_WRITER_PROPERTY;
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
+
+/**
+ * Abstract test of whole cycle of map-reduce processing via Job tracker.
+ */
+public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest {
+    /** IGFS block size. */
+    protected static final int IGFS_BLOCK_SIZE = 512 * 1024;
+
+    /** Amount of blocks to prefetch. */
+    protected static final int PREFETCH_BLOCKS = 1;
+
+    /** Amount of sequential block reads before prefetch is triggered. */
+    protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
+
+    /** Secondary file system URI. */
+    protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/";
+
+    /** Secondary file system configuration path. */
+    protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
+
+    /** The user to run Hadoop job on behalf of. */
+    protected static final String USER = "vasya";
+
+    /** Secondary IGFS name. */
+    protected static final String SECONDARY_IGFS_NAME = "igfs-secondary";
+
+    /** Red constant. */
+    protected static final int red = 10_000;
+
+    /** Blue constant. */
+    protected static final int blue = 20_000;
+
+    /** Green constant. */
+    protected static final int green = 15_000;
+
+    /** Yellow constant. */
+    protected static final int yellow = 7_000;
+
+    /** The secondary Ignite node. */
+    protected Ignite igniteSecondary;
+
+    /** The secondary Fs. */
+    protected IgfsSecondaryFileSystem secondaryFs;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /**
+     * Gets owner of a IgfsEx path.
+     * @param p The path.
+     * @return The owner.
+     */
+    private static String getOwner(IgfsEx i, IgfsPath p) {
+        return i.info(p).property(IgfsUtils.PROP_USER_NAME);
+    }
+
+    /**
+     * Gets owner of a secondary Fs path.
+     * @param secFs The sec Fs.
+     * @param p The path.
+     * @return The owner.
+     */
+    private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) {
+        return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() {
+            @Override public String apply() {
+                return secFs.info(p).property(IgfsUtils.PROP_USER_NAME);
+            }
+        });
+    }
+
+    /**
+     * Checks owner of the path.
+     * @param p The path.
+     */
+    private void checkOwner(IgfsPath p) {
+        String ownerPrim = getOwner(igfs, p);
+        assertEquals(USER, ownerPrim);
+
+        String ownerSec = getOwnerSecondary(secondaryFs, p);
+        assertEquals(USER, ownerSec);
+    }
+
+    /**
+     * Does actual test job
+     *
+     * @param useNewMapper flag to use new mapper API.
+     * @param useNewCombiner flag to use new combiner API.
+     * @param useNewReducer flag to use new reducer API.
+     */
+    protected final void doTest(IgfsPath inFile, boolean useNewMapper, boolean useNewCombiner, boolean useNewReducer)
+        throws Exception {
+        igfs.delete(new IgfsPath(PATH_OUTPUT), true);
+
+        JobConf jobConf = new JobConf();
+
+        jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
+        jobConf.setUser(USER);
+        jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz");
+
+        //To split into about 40 items for v2
+        jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000);
+
+        //For v1
+        jobConf.setInt("fs.local.block.size", 65000);
+
+        // File system coordinates.
+        setupFileSystems(jobConf);
+
+        HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer);
+
+        Job job = Job.getInstance(jobConf);
+
+        HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy());
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString()));
+        FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
+
+        job.setJarByClass(HadoopWordCount2.class);
+
+        HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
+
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+        fut.get();
+
+        checkJobStatistics(jobId);
+
+        final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";
+
+        checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS"));
+
+        checkOwner(new IgfsPath(outFile));
+
+        String actual = readAndSortFile(outFile, job.getConfiguration());
+
+        assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
+                useNewReducer,
+            "blue\t" + blue + "\n" +
+                "green\t" + green + "\n" +
+                "red\t" + red + "\n" +
+                "yellow\t" + yellow + "\n",
+            actual
+        );
+    }
+
+    /**
+     * Gets if to compress output data with Snappy.
+     *
+     * @return If to compress output data with Snappy.
+     */
+    protected boolean compressOutputSnappy() {
+        return false;
+    }
+
+    /**
+     * Simple test job statistics.
+     *
+     * @param jobId Job id.
+     * @throws IgniteCheckedException
+     */
+    private void checkJobStatistics(HadoopJobId jobId) throws IgniteCheckedException, IOException {
+        HadoopCounters cntrs = grid(0).hadoop().counters(jobId);
+
+        HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
+
+        Map<String, SortedMap<Integer,Long>> tasks = new TreeMap<>();
+
+        Map<String, Integer> phaseOrders = new HashMap<>();
+        phaseOrders.put("submit", 0);
+        phaseOrders.put("prepare", 1);
+        phaseOrders.put("start", 2);
+        phaseOrders.put("Cstart", 3);
+        phaseOrders.put("finish", 4);
+
+        String prevTaskId = null;
+
+        long apiEvtCnt = 0;
+
+        for (T2<String, Long> evt : perfCntr.evts()) {
+            //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706
+            String[] parsedEvt = evt.get1().split(" ");
+
+            String taskId;
+            String taskPhase;
+
+            if ("JOB".equals(parsedEvt[0])) {
+                taskId = parsedEvt[0];
+                taskPhase = parsedEvt[1];
+            }
+            else {
+                taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1];
+                taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2];
+            }
+
+            if (!taskId.equals(prevTaskId))
+                tasks.put(taskId, new TreeMap<Integer,Long>());
+
+            Integer pos = phaseOrders.get(taskPhase);
+
+            assertNotNull("Invalid phase " + taskPhase, pos);
+
+            tasks.get(taskId).put(pos, evt.get2());
+
+            prevTaskId = taskId;
+
+            apiEvtCnt++;
+        }
+
+        for (Map.Entry<String ,SortedMap<Integer,Long>> task : tasks.entrySet()) {
+            Map<Integer, Long> order = task.getValue();
+
+            long prev = 0;
+
+            for (Map.Entry<Integer, Long> phase : order.entrySet()) {
+                assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev);
+
+                prev = phase.getValue();
+            }
+        }
+
+        final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance");
+
+        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return igfs.exists(statPath);
+            }
+        }, 20_000);
+
+        final long apiEvtCnt0 = apiEvtCnt;
+
+        boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    try (BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)))) {
+                        return apiEvtCnt0 == HadoopTestUtils.simpleCheckJobStatFile(reader);
+                    }
+                }
+                catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }, 10000);
+
+        if (!res) {
+            BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)));
+
+            assert false : "Invalid API events count [exp=" + apiEvtCnt0 +
+                ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']';
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG);
+
+        super.beforeTest();
+    }
+
+    /**
+     * Start grid with IGFS.
+     *
+     * @param gridName Grid name.
+     * @param igfsName IGFS name
+     * @param mode IGFS mode.
+     * @param secondaryFs Secondary file system (optional).
+     * @param restCfg Rest configuration string (optional).
+     * @return Started grid instance.
+     * @throws Exception If failed.
+     */
+    protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode,
+        @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception {
+        FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+        igfsCfg.setDataCacheName("dataCache");
+        igfsCfg.setMetaCacheName("metaCache");
+        igfsCfg.setName(igfsName);
+        igfsCfg.setBlockSize(IGFS_BLOCK_SIZE);
+        igfsCfg.setDefaultMode(mode);
+        igfsCfg.setIpcEndpointConfiguration(restCfg);
+        igfsCfg.setSecondaryFileSystem(secondaryFs);
+        igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
+        igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
+
+        CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+        dataCacheCfg.setName("dataCache");
+        dataCacheCfg.setCacheMode(PARTITIONED);
+        dataCacheCfg.setNearConfiguration(null);
+        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+        dataCacheCfg.setBackups(0);
+        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+        dataCacheCfg.setOffHeapMaxMemory(0);
+
+        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+        metaCacheCfg.setName("metaCache");
+        metaCacheCfg.setCacheMode(REPLICATED);
+        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setGridName(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        cfg.setDiscoverySpi(discoSpi);
+        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+        cfg.setFileSystemConfiguration(igfsCfg);
+
+        cfg.setLocalHost("127.0.0.1");
+        cfg.setConnectorConfiguration(null);
+
+        return G.start(cfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
+        FileSystemConfiguration fsCfg = super.igfsConfiguration();
+
+        secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
+
+        fsCfg.setSecondaryFileSystem(secondaryFs);
+
+        return fsCfg;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java
new file mode 100644
index 0000000..843b42b
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopErrorSimulator.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Error simulator.
+ */
+public class HadoopErrorSimulator {
+    /** No-op singleton instance. */
+    public static final HadoopErrorSimulator noopInstance = new HadoopErrorSimulator();
+
+    /** Instance ref. */
+    private static final AtomicReference<HadoopErrorSimulator> ref = new AtomicReference<>(noopInstance);
+
+    /**
+     * Creates simulator of given kind with given stage bits.
+     *
+     * @param kind The kind.
+     * @param bits The stage bits.
+     * @return The simulator.
+     */
+    public static HadoopErrorSimulator create(Kind kind, int bits) {
+        switch (kind) {
+            case Noop:
+                return noopInstance;
+            case Runtime:
+                return new RuntimeExceptionBitHadoopErrorSimulator(bits);
+            case IOException:
+                return new IOExceptionBitHadoopErrorSimulator(bits);
+            case Error:
+                return new ErrorBitHadoopErrorSimulator(bits);
+            default:
+                throw new IllegalStateException("Unknown kind: " + kind);
+        }
+    }
+
+    /**
+     * Gets the error simulator instance.
+     */
+    public static HadoopErrorSimulator instance() {
+        return ref.get();
+    }
+
+    /**
+     * Sets instance.
+     */
+    public static boolean setInstance(HadoopErrorSimulator expect, HadoopErrorSimulator update) {
+        return ref.compareAndSet(expect, update);
+    }
+
+    /**
+     * Constructor.
+     */
+    private HadoopErrorSimulator() {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMapConfigure() {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMapSetup()  throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMap() throws IOException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMapCleanup()  throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMapClose()  throws IOException {
+        // no-op
+    }
+
+    /**
+     * setConf() does not declare IOException to be thrown.
+     */
+    public void onCombineConfigure() {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onCombineSetup() throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onCombine() throws IOException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onCombineCleanup() throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onReduceConfigure() {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onReduceSetup()  throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onReduce()  throws IOException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onReduceCleanup()  throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Error kind.
+     */
+    public enum Kind {
+        /** No error. */
+        Noop,
+
+        /** Runtime. */
+        Runtime,
+
+        /** IOException. */
+        IOException,
+
+        /** java.lang.Error. */
+        Error
+    }
+
+    /**
+     * Runtime error simulator.
+     */
+    public static class RuntimeExceptionBitHadoopErrorSimulator extends HadoopErrorSimulator {
+        /** Stage bits: defines what map-reduce stages will cause errors. */
+        private final int bits;
+
+        /**
+         * Constructor.
+         */
+        protected RuntimeExceptionBitHadoopErrorSimulator(int b) {
+            bits = b;
+        }
+
+        /**
+         * Simulates an error.
+         */
+        protected void simulateError() throws IOException {
+            throw new RuntimeException("An error simulated by " + getClass().getSimpleName());
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onMapConfigure() {
+            try {
+                if ((bits & 1) != 0)
+                    simulateError();
+            }
+            catch (IOException e) {
+                // ignore
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onMapSetup() throws IOException, InterruptedException {
+            if ((bits & 2) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onMap() throws IOException {
+            if ((bits & 4) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onMapCleanup() throws IOException, InterruptedException {
+            if ((bits & 8) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onCombineConfigure() {
+            try {
+                if ((bits & 16) != 0)
+                    simulateError();
+            }
+            catch (IOException e) {
+                // ignore
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onCombineSetup() throws IOException, InterruptedException {
+            if ((bits & 32) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onCombine() throws IOException {
+            if ((bits & 64) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onCombineCleanup() throws IOException, InterruptedException {
+            if ((bits & 128) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onReduceConfigure() {
+            try {
+                if ((bits & 256) != 0)
+                    simulateError();
+            }
+            catch (IOException e) {
+                // ignore
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onReduceSetup() throws IOException, InterruptedException {
+            if ((bits & 512) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onReduce() throws IOException {
+            if ((bits & 1024) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onReduceCleanup() throws IOException, InterruptedException {
+            if ((bits & 2048) != 0)
+                simulateError();
+        }
+    }
+
+    /**
+     * java.lang.Error simulator.
+     */
+    public static class ErrorBitHadoopErrorSimulator extends RuntimeExceptionBitHadoopErrorSimulator {
+        /**
+         * Constructor.
+         */
+        public ErrorBitHadoopErrorSimulator(int bits) {
+            super(bits);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void simulateError() {
+            throw new Error("An error simulated by " + getClass().getSimpleName());
+        }
+    }
+
+    /**
+     * IOException simulator.
+     */
+    public static class IOExceptionBitHadoopErrorSimulator extends RuntimeExceptionBitHadoopErrorSimulator {
+        /**
+         * Constructor.
+         */
+        public IOExceptionBitHadoopErrorSimulator(int bits) {
+            super(bits);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void simulateError() throws IOException {
+            throw new IOException("An IOException simulated by " + getClass().getSimpleName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java
new file mode 100644
index 0000000..dd12935
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceErrorResilienceTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
+
+/**
+ * Test of error resiliency after an error in a map-reduce job execution.
+ * Combinations tested:
+ * { new ALI, old API }
+ *   x { unchecked exception, checked exception, error }
+ *   x { phase where the error happens }.
+ */
+public class HadoopMapReduceErrorResilienceTest extends HadoopAbstractMapReduceTest {
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError0_Runtime() throws Exception {
+        doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Runtime);
+    }
+
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError0_IOException() throws Exception {
+        doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.IOException);
+    }
+
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError0_Error() throws Exception {
+        doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Error);
+    }
+
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError7_Runtime() throws Exception {
+        doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Runtime);
+    }
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError7_IOException() throws Exception {
+        doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.IOException);
+    }
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError7_Error() throws Exception {
+        doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Error);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60 * 1000L;
+    }
+
+    /**
+     * Tests correct work after an error.
+     *
+     * @throws Exception On error.
+     */
+    private void doTestRecoveryAfterAnError(int useNewBits, HadoopErrorSimulator.Kind simulatorKind) throws Exception {
+        try {
+            IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+            igfs.mkdirs(inDir);
+
+            IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
+
+            generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow);
+
+            boolean useNewMapper = (useNewBits & 1) == 0;
+            boolean useNewCombiner = (useNewBits & 2) == 0;
+            boolean useNewReducer = (useNewBits & 4) == 0;
+
+            for (int i = 0; i < 12; i++) {
+                int bits = 1 << i;
+
+                System.out.println("############################ Simulator kind = " + simulatorKind
+                    + ", Stage bits = " + bits);
+
+                HadoopErrorSimulator sim = HadoopErrorSimulator.create(simulatorKind, bits);
+
+                doTestWithErrorSimulator(sim, inFile, useNewMapper, useNewCombiner, useNewReducer);
+            }
+        } catch (Throwable t) {
+            t.printStackTrace();
+
+            fail("Unexpected throwable: " + t);
+        }
+    }
+
+    /**
+     * Performs test with given error simulator.
+     *
+     * @param sim The simulator.
+     * @param inFile Input file.
+     * @param useNewMapper If the use new mapper API.
+     * @param useNewCombiner If to use new combiner.
+     * @param useNewReducer If to use new reducer API.
+     * @throws Exception If failed.
+     */
+    private void doTestWithErrorSimulator(HadoopErrorSimulator sim, IgfsPath inFile, boolean useNewMapper,
+        boolean useNewCombiner, boolean useNewReducer) throws Exception {
+        // Set real simulating error simulator:
+        assertTrue(HadoopErrorSimulator.setInstance(HadoopErrorSimulator.noopInstance, sim));
+
+        try {
+            // Expect failure there:
+            doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
+        }
+        catch (Throwable t) { // This may be an Error.
+            // Expected:
+            System.out.println(t.toString()); // Ignore, continue the test.
+        }
+
+        // Set no-op error simulator:
+        assertTrue(HadoopErrorSimulator.setInstance(sim, HadoopErrorSimulator.noopInstance));
+
+        // Expect success there:
+        doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index 4426847..b703896 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -17,129 +17,13 @@
 
 package org.apache.ignite.internal.processors.hadoop;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
-import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
-import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
-import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
-import org.apache.ignite.igfs.IgfsMode;
 import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsUserContext;
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
-import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount1;
 import org.apache.ignite.internal.processors.hadoop.examples.HadoopWordCount2;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheMode.REPLICATED;
-import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_COUNTER_WRITER_PROPERTY;
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.createJobInfo;
 
 /**
  * Test of whole cycle of map-reduce processing via Job tracker.
  */
-public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
-    /** IGFS block size. */
-    protected static final int IGFS_BLOCK_SIZE = 512 * 1024;
-
-    /** Amount of blocks to prefetch. */
-    protected static final int PREFETCH_BLOCKS = 1;
-
-    /** Amount of sequential block reads before prefetch is triggered. */
-    protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
-
-    /** Secondary file system URI. */
-    protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/";
-
-    /** Secondary file system configuration path. */
-    protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
-
-    /** The user to run Hadoop job on behalf of. */
-    protected static final String USER = "vasya";
-
-    /** Secondary IGFS name. */
-    protected static final String SECONDARY_IGFS_NAME = "igfs-secondary";
-
-    /** The secondary Ignite node. */
-    protected Ignite igniteSecondary;
-
-    /** The secondary Fs. */
-    protected IgfsSecondaryFileSystem secondaryFs;
-
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 3;
-    }
-
-    /**
-     * Gets owner of a IgfsEx path.
-     * @param p The path.
-     * @return The owner.
-     */
-    private static String getOwner(IgfsEx i, IgfsPath p) {
-        return i.info(p).property(IgfsUtils.PROP_USER_NAME);
-    }
-
-    /**
-     * Gets owner of a secondary Fs path.
-     * @param secFs The sec Fs.
-     * @param p The path.
-     * @return The owner.
-     */
-    private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) {
-        return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() {
-            @Override public String apply() {
-                return secFs.info(p).property(IgfsUtils.PROP_USER_NAME);
-            }
-        });
-    }
-
-    /**
-     * Checks owner of the path.
-     * @param p The path.
-     */
-    private void checkOwner(IgfsPath p) {
-        String ownerPrim = getOwner(igfs, p);
-        assertEquals(USER, ownerPrim);
-
-        String ownerSec = getOwnerSecondary(secondaryFs, p);
-        assertEquals(USER, ownerSec);
-    }
-
+public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest {
     /**
      * Tests whole job execution with all phases in all combination of new and old versions of API.
      * @throws Exception If fails.
@@ -151,260 +35,32 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
 
         IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
 
-        final int red = 10_000;
-        final int blue = 20_000;
-        final int green = 15_000;
-        final int yellow = 7_000;
-
         generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow );
 
-        for (int i = 0; i < 3; i++) {
-            igfs.delete(new IgfsPath(PATH_OUTPUT), true);
-
-            boolean useNewMapper = (i & 1) == 0;
-            boolean useNewCombiner = (i & 2) == 0;
-            boolean useNewReducer = (i & 4) == 0;
-
-            JobConf jobConf = new JobConf();
-
-            jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
-            jobConf.setUser(USER);
-            jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz");
-
-            //To split into about 40 items for v2
-            jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000);
-
-            //For v1
-            jobConf.setInt("fs.local.block.size", 65000);
-
-            // File system coordinates.
-            setupFileSystems(jobConf);
-
-            HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer);
-
-            Job job = Job.getInstance(jobConf);
-
-            HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer, compressOutputSnappy());
-
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(IntWritable.class);
-
-            FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString()));
-            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
-
-            job.setJarByClass(HadoopWordCount2.class);
-
-            HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
-
-            IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
-
-            fut.get();
-
-            checkJobStatistics(jobId);
-
-            final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";
-
-            checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS"));
-
-            checkOwner(new IgfsPath(outFile));
-
-            String actual = readAndSortFile(outFile, job.getConfiguration());
-
-            assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
-                useNewReducer,
-                "blue\t" + blue + "\n" +
-                "green\t" + green + "\n" +
-                "red\t" + red + "\n" +
-                "yellow\t" + yellow + "\n",
-                actual
-            );
-        }
-    }
-
-    /**
-     * Gets if to compress output data with Snappy.
-     *
-     * @return If to compress output data with Snappy.
-     */
-    protected boolean compressOutputSnappy() {
-        return false;
-    }
-
-    /**
-     * Simple test job statistics.
-     *
-     * @param jobId Job id.
-     * @throws IgniteCheckedException
-     */
-    private void checkJobStatistics(HadoopJobId jobId) throws IgniteCheckedException, IOException {
-        HadoopCounters cntrs = grid(0).hadoop().counters(jobId);
-
-        HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null);
-
-        Map<String, SortedMap<Integer,Long>> tasks = new TreeMap<>();
-
-        Map<String, Integer> phaseOrders = new HashMap<>();
-        phaseOrders.put("submit", 0);
-        phaseOrders.put("prepare", 1);
-        phaseOrders.put("start", 2);
-        phaseOrders.put("Cstart", 3);
-        phaseOrders.put("finish", 4);
-
-        String prevTaskId = null;
-
-        long apiEvtCnt = 0;
-
-        for (T2<String, Long> evt : perfCntr.evts()) {
-            //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706
-            String[] parsedEvt = evt.get1().split(" ");
-
-            String taskId;
-            String taskPhase;
-
-            if ("JOB".equals(parsedEvt[0])) {
-                taskId = parsedEvt[0];
-                taskPhase = parsedEvt[1];
-            }
-            else {
-                taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1];
-                taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2];
-            }
-
-            if (!taskId.equals(prevTaskId))
-                tasks.put(taskId, new TreeMap<Integer,Long>());
-
-            Integer pos = phaseOrders.get(taskPhase);
-
-            assertNotNull("Invalid phase " + taskPhase, pos);
-
-            tasks.get(taskId).put(pos, evt.get2());
-
-            prevTaskId = taskId;
-
-            apiEvtCnt++;
-        }
-
-        for (Map.Entry<String ,SortedMap<Integer,Long>> task : tasks.entrySet()) {
-            Map<Integer, Long> order = task.getValue();
-
-            long prev = 0;
-
-            for (Map.Entry<Integer, Long> phase : order.entrySet()) {
-                assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev);
-
-                prev = phase.getValue();
-            }
-        }
-
-        final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance");
-
-        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                return igfs.exists(statPath);
-            }
-        }, 20_000);
-
-        final long apiEvtCnt0 = apiEvtCnt;
-
-        boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                try {
-                    try (BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)))) {
-                        return apiEvtCnt0 == HadoopTestUtils.simpleCheckJobStatFile(reader);
-                    }
-                }
-                catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }, 10000);
+        for (boolean[] apiMode: getApiModes()) {
+            assert apiMode.length == 3;
 
-        if (!res) {
-            BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath)));
+            boolean useNewMapper = apiMode[0];
+            boolean useNewCombiner = apiMode[1];
+            boolean useNewReducer = apiMode[2];
 
-            assert false : "Invalid API events count [exp=" + apiEvtCnt0 +
-                ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']';
+            doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
         }
     }
 
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG);
-
-        super.beforeTest();
-    }
-
     /**
-     * Start grid with IGFS.
+     * Gets API mode combinations to be tested.
+     * Each boolean[] is { newMapper, newCombiner, newReducer } flag triplet.
      *
-     * @param gridName Grid name.
-     * @param igfsName IGFS name
-     * @param mode IGFS mode.
-     * @param secondaryFs Secondary file system (optional).
-     * @param restCfg Rest configuration string (optional).
-     * @return Started grid instance.
-     * @throws Exception If failed.
-     */
-    protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode,
-        @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception {
-        FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
-
-        igfsCfg.setDataCacheName("dataCache");
-        igfsCfg.setMetaCacheName("metaCache");
-        igfsCfg.setName(igfsName);
-        igfsCfg.setBlockSize(IGFS_BLOCK_SIZE);
-        igfsCfg.setDefaultMode(mode);
-        igfsCfg.setIpcEndpointConfiguration(restCfg);
-        igfsCfg.setSecondaryFileSystem(secondaryFs);
-        igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
-        igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
-
-        CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
-
-        dataCacheCfg.setName("dataCache");
-        dataCacheCfg.setCacheMode(PARTITIONED);
-        dataCacheCfg.setNearConfiguration(null);
-        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
-        dataCacheCfg.setBackups(0);
-        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
-        dataCacheCfg.setOffHeapMaxMemory(0);
-
-        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
-
-        metaCacheCfg.setName("metaCache");
-        metaCacheCfg.setCacheMode(REPLICATED);
-        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
-
-        IgniteConfiguration cfg = new IgniteConfiguration();
-
-        cfg.setGridName(gridName);
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
-        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
-
-        cfg.setDiscoverySpi(discoSpi);
-        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
-        cfg.setFileSystemConfiguration(igfsCfg);
-
-        cfg.setLocalHost("127.0.0.1");
-        cfg.setConnectorConfiguration(null);
-
-        return G.start(cfg);
-    }
-
-    /**
-     * @return IGFS configuration.
+     * @return Arrays of booleans indicating API combinations to test.
      */
-    @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
-        FileSystemConfiguration fsCfg = super.igfsConfiguration();
-
-        secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
-
-        fsCfg.setSecondaryFileSystem(secondaryFs);
-
-        return fsCfg;
+    protected boolean[][] getApiModes() {
+        return new boolean[][] {
+            { false, false, false },
+            { false, false, true },
+            { false, true,  false },
+            { true,  false, false },
+            { true,  true,  true },
+        };
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
index 22d33a5..27a5fcd 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyFullMapReduceTest.java
@@ -25,4 +25,12 @@ public class HadoopSnappyFullMapReduceTest extends HadoopMapReduceTest {
     @Override protected boolean compressOutputSnappy() {
         return true;
     }
+
+    /** {@inheritDoc} */
+    @Override protected boolean[][] getApiModes() {
+        return new boolean[][] {
+            { false, false, true },
+            { true, true, true },
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java
index 30b12bd..d4cd190 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.ignite.internal.processors.hadoop.HadoopErrorSimulator;
 
 /**
  * Mapper phase of WordCount job.
@@ -56,6 +57,8 @@ public class HadoopWordCount1Map extends MapReduceBase implements Mapper<LongWri
 
             output.collect(word, one);
         }
+
+        HadoopErrorSimulator.instance().onMap();
     }
 
     /** {@inheritDoc} */
@@ -63,5 +66,14 @@ public class HadoopWordCount1Map extends MapReduceBase implements Mapper<LongWri
         super.configure(job);
 
         wasConfigured = true;
+
+        HadoopErrorSimulator.instance().onMapConfigure();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IOException {
+        super.close();
+
+        HadoopErrorSimulator.instance().onMapClose();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
index 2335911..b400d9b 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.ignite.internal.processors.hadoop.HadoopErrorSimulator;
 
 /**
  * Combiner and Reducer phase of WordCount job.
@@ -45,6 +46,8 @@ public class HadoopWordCount1Reduce extends MapReduceBase implements Reducer<Tex
             sum += values.next().get();
 
         output.collect(key, new IntWritable(sum));
+
+        HadoopErrorSimulator.instance().onReduce();
     }
 
     /** {@inheritDoc} */
@@ -52,5 +55,7 @@ public class HadoopWordCount1Reduce extends MapReduceBase implements Reducer<Tex
         super.configure(job);
 
         wasConfigured = true;
+
+        HadoopErrorSimulator.instance().onReduceConfigure();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
index 4b508ca..b2cfee3 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
@@ -91,7 +91,7 @@ public class HadoopWordCount2 {
         }
 
         if (setCombiner)
-            job.setCombinerClass(HadoopWordCount2Reducer.class);
+            job.setCombinerClass(HadoopWordCount2Combiner.class);
 
         if (setReducer) {
             job.setReducerClass(HadoopWordCount2Reducer.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Combiner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Combiner.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Combiner.java
new file mode 100644
index 0000000..0d25e3c
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Combiner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.hadoop.examples;
+
+import java.io.IOException;
+import org.apache.ignite.internal.processors.hadoop.HadoopErrorSimulator;
+
+/**
+ * Combiner function with pluggable error simulator.
+ */
+public class HadoopWordCount2Combiner extends HadoopWordCount2Reducer {
+    /** {@inheritDoc} */
+    @Override protected void configError() {
+        HadoopErrorSimulator.instance().onCombineConfigure();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void setupError() throws IOException, InterruptedException {
+        HadoopErrorSimulator.instance().onCombineSetup();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void reduceError() throws IOException, InterruptedException {
+        HadoopErrorSimulator.instance().onCombine();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void cleanupError() throws IOException, InterruptedException {
+        HadoopErrorSimulator.instance().onCombineCleanup();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java
index 0d0c128..76857e6 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.ignite.internal.processors.hadoop.HadoopErrorSimulator;
 
 /**
  * Mapper phase of WordCount job.
@@ -53,17 +54,31 @@ public class HadoopWordCount2Mapper extends Mapper<Object, Text, Text, IntWritab
 
             ctx.write(word, one);
         }
+
+        HadoopErrorSimulator.instance().onMap();
     }
 
     /** {@inheritDoc} */
-    @Override protected void setup(Context context) throws IOException, InterruptedException {
-        super.setup(context);
+    @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+        super.setup(ctx);
+
         wasSetUp = true;
+
+        HadoopErrorSimulator.instance().onMapSetup();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void cleanup(Context ctx) throws IOException, InterruptedException {
+        super.cleanup(ctx);
+
+        HadoopErrorSimulator.instance().onMapCleanup();
     }
 
     /** {@inheritDoc} */
     @Override public void setConf(Configuration conf) {
         wasConfigured = true;
+
+        HadoopErrorSimulator.instance().onMapConfigure();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
index 63a9d95..e780170 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.ignite.internal.processors.hadoop.HadoopErrorSimulator;
 
 /**
  * Combiner and Reducer phase of WordCount job.
@@ -50,6 +51,15 @@ public class HadoopWordCount2Reducer extends Reducer<Text, IntWritable, Text, In
         totalWordCnt.set(wordCnt);
 
         ctx.write(key, totalWordCnt);
+
+        reduceError();
+    }
+
+    /**
+     * Simulates reduce error if needed.
+     */
+    protected void reduceError() throws IOException, InterruptedException {
+        HadoopErrorSimulator.instance().onReduce();
     }
 
     /** {@inheritDoc} */
@@ -57,16 +67,47 @@ public class HadoopWordCount2Reducer extends Reducer<Text, IntWritable, Text, In
         super.setup(context);
 
         wasSetUp = true;
+
+        setupError();
+    }
+
+    /**
+     * Simulates setup error if needed.
+     */
+    protected void setupError() throws IOException, InterruptedException {
+        HadoopErrorSimulator.instance().onReduceSetup();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void cleanup(Context context) throws IOException, InterruptedException {
+        super.cleanup(context);
+
+        cleanupError();
+    }
+
+    /**
+     * Simulates cleanup error if needed.
+     */
+    protected void cleanupError() throws IOException, InterruptedException {
+        HadoopErrorSimulator.instance().onReduceCleanup();
     }
 
     /** {@inheritDoc} */
     @Override public void setConf(Configuration conf) {
         wasConfigured = true;
+
+        configError();
+    }
+
+    /**
+     * Simulates configuration error if needed.
+     */
+    protected void configError() {
+        HadoopErrorSimulator.instance().onReduceConfigure();
     }
 
     /** {@inheritDoc} */
     @Override public Configuration getConf() {
         return null;
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9f4f6e8/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 3358e18..554cbc7 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopFileSystemsTest;
 import org.apache.ignite.internal.processors.hadoop.HadoopGroupingTest;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobTrackerSelfTest;
 import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceEmbeddedSelfTest;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceErrorResilienceTest;
 import org.apache.ignite.internal.processors.hadoop.HadoopMapReduceTest;
 import org.apache.ignite.internal.processors.hadoop.HadoopNoHadoopMapReduceTest;
 import org.apache.ignite.internal.processors.hadoop.HadoopSerializationWrapperSelfTest;
@@ -168,6 +169,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceTest.class.getName())));
         suite.addTest(new TestSuite(ldr.loadClass(HadoopNoHadoopMapReduceTest.class.getName())));
+        suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceErrorResilienceTest.class.getName())));
 
         suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceEmbeddedSelfTest.class.getName())));
 
@@ -246,15 +248,15 @@ public class IgniteHadoopTestSuite extends TestSuite {
 
         X.println("tmp: " + tmpPath);
 
-        File install = new File(tmpPath + File.separatorChar + "__hadoop");
+        final File install = new File(tmpPath + File.separatorChar + "__hadoop");
 
-        File home = new File(install, destName);
+        final File home = new File(install, destName);
 
         X.println("Setting " + homeVariable + " to " + home.getAbsolutePath());
 
         System.setProperty(homeVariable, home.getAbsolutePath());
 
-        File successFile = new File(home, "__success");
+        final File successFile = new File(home, "__success");
 
         if (home.exists()) {
             if (successFile.exists()) {


Mime
View raw message