ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [05/45] incubator-ignite git commit: IGNITE-386: Squashed changes.
Date Thu, 05 Mar 2015 07:30:14 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java
deleted file mode 100644
index af3f872..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import com.google.common.base.*;
-import org.apache.hadoop.io.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Tests of Map, Combine and Reduce task executions of any version of hadoop API.
- */
-abstract class GridHadoopTasksAllVersionsTest extends GridHadoopAbstractWordCountTest {
-    /** Empty hosts array. */
-    private static final String[] HOSTS = new String[0];
-
-    /**
-     * Creates some grid hadoop job. Override this method to create tests for any job implementation.
-     *
-     * @param inFile Input file name for the job.
-     * @param outFile Output file name for the job.
-     * @return Hadoop job.
-     * @throws IOException If fails.
-     */
-    public abstract GridHadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception;
-
-    /**
-     * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API
-     */
-    public abstract String getOutputFileNamePrefix();
-
-    /**
-     * Tests map task execution.
-     *
-     * @throws Exception If fails.
-     */
-    @SuppressWarnings("ConstantConditions")
-    public void testMapTask() throws Exception {
-        IgfsPath inDir = new IgfsPath(PATH_INPUT);
-
-        igfs.mkdirs(inDir);
-
-        IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input");
-
-        URI inFileUri = URI.create(igfsScheme() + inFile.toString());
-
-        try (PrintWriter pw = new PrintWriter(igfs.create(inFile, true))) {
-            pw.println("hello0 world0");
-            pw.println("world1 hello1");
-        }
-
-        GridHadoopFileBlock fileBlock1 = new GridHadoopFileBlock(HOSTS, inFileUri, 0, igfs.info(inFile).length() - 1);
-
-        try (PrintWriter pw = new PrintWriter(igfs.append(inFile, false))) {
-            pw.println("hello2 world2");
-            pw.println("world3 hello3");
-        }
-        GridHadoopFileBlock fileBlock2 = new GridHadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(),
-                igfs.info(inFile).length() - fileBlock1.length());
-
-        GridHadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
-
-        GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1);
-
-        GridHadoopTestTaskContext ctx = new GridHadoopTestTaskContext(taskInfo, gridJob);
-
-        ctx.mockOutput().clear();
-
-        ctx.run();
-
-        assertEquals("hello0,1; world0,1; world1,1; hello1,1", Joiner.on("; ").join(ctx.mockOutput()));
-
-        ctx.mockOutput().clear();
-
-        ctx.taskInfo(new GridHadoopTaskInfo(GridHadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock2));
-
-        ctx.run();
-
-        assertEquals("hello2,1; world2,1; world3,1; hello3,1", Joiner.on("; ").join(ctx.mockOutput()));
-    }
-
-    /**
-     * Generates input data for reduce-like operation into mock context input and runs the operation.
-     *
-     * @param gridJob Job is to create reduce task from.
-     * @param taskType Type of task - combine or reduce.
-     * @param taskNum Number of task in job.
-     * @param words Pairs of words and its counts.
-     * @return Context with mock output.
-     * @throws IgniteCheckedException If fails.
-     */
-    private GridHadoopTestTaskContext runTaskWithInput(GridHadoopV2Job gridJob, GridHadoopTaskType taskType,
-        int taskNum, String... words) throws IgniteCheckedException {
-        GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null);
-
-        GridHadoopTestTaskContext ctx = new GridHadoopTestTaskContext(taskInfo, gridJob);
-
-        for (int i = 0; i < words.length; i+=2) {
-            List<IntWritable> valList = new ArrayList<>();
-
-            for (int j = 0; j < Integer.parseInt(words[i + 1]); j++)
-                valList.add(new IntWritable(1));
-
-            ctx.mockInput().put(new Text(words[i]), valList);
-        }
-
-        ctx.run();
-
-        return ctx;
-    }
-
-    /**
-     * Tests reduce task execution.
-     *
-     * @throws Exception If fails.
-     */
-    public void testReduceTask() throws Exception {
-        GridHadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
-
-        runTaskWithInput(gridJob, GridHadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10");
-        runTaskWithInput(gridJob, GridHadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15");
-
-        assertEquals(
-            "word1\t5\n" +
-            "word2\t10\n",
-            readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000000/" +
-                getOutputFileNamePrefix() + "00000")
-        );
-
-        assertEquals(
-            "word3\t7\n" +
-            "word4\t15\n",
-            readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000001/" +
-                getOutputFileNamePrefix() + "00001")
-        );
-    }
-
-    /**
-     * Tests combine task execution.
-     *
-     * @throws Exception If fails.
-     */
-    public void testCombinerTask() throws Exception {
-        GridHadoopV2Job gridJob = getHadoopJob("/", "/");
-
-        GridHadoopTestTaskContext ctx =
-            runTaskWithInput(gridJob, GridHadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10");
-
-        assertEquals("word1,5; word2,10", Joiner.on("; ").join(ctx.mockOutput()));
-
-        ctx = runTaskWithInput(gridJob, GridHadoopTaskType.COMBINE, 1, "word3", "7", "word4", "15");
-
-        assertEquals("word3,7; word4,15", Joiner.on("; ").join(ctx.mockOutput()));
-    }
-
-    /**
-     * Runs chain of map-combine task on file block.
-     *
-     * @param fileBlock block of input file to be processed.
-     * @param gridJob Hadoop job implementation.
-     * @return Context of combine task with mock output.
-     * @throws IgniteCheckedException If fails.
-     */
-    private GridHadoopTestTaskContext runMapCombineTask(GridHadoopFileBlock fileBlock, GridHadoopV2Job gridJob)
-        throws IgniteCheckedException {
-        GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock);
-
-        GridHadoopTestTaskContext mapCtx = new GridHadoopTestTaskContext(taskInfo, gridJob);
-
-        mapCtx.run();
-
-        //Prepare input for combine
-        taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.COMBINE, gridJob.id(), 0, 0, null);
-
-        GridHadoopTestTaskContext combineCtx = new GridHadoopTestTaskContext(taskInfo, gridJob);
-
-        combineCtx.makeTreeOfWritables(mapCtx.mockOutput());
-
-        combineCtx.run();
-
-        return combineCtx;
-    }
-
-    /**
-     * Tests all job in complex.
-     * Runs 2 chains of map-combine tasks and sends result into one reduce task.
-     *
-     * @throws Exception If fails.
-     */
-    @SuppressWarnings("ConstantConditions")
-    public void testAllTasks() throws Exception {
-        IgfsPath inDir = new IgfsPath(PATH_INPUT);
-
-        igfs.mkdirs(inDir);
-
-        IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input");
-
-        URI inFileUri = URI.create(igfsScheme() + inFile.toString());
-
-        generateTestFile(inFile.toString(), "red", 100, "blue", 200, "green", 150, "yellow", 70);
-
-        //Split file into two blocks
-        long fileLen = igfs.info(inFile).length();
-
-        Long l = fileLen / 2;
-
-        GridHadoopFileBlock fileBlock1 = new GridHadoopFileBlock(HOSTS, inFileUri, 0, l);
-        GridHadoopFileBlock fileBlock2 = new GridHadoopFileBlock(HOSTS, inFileUri, l, fileLen - l);
-
-        GridHadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
-
-        GridHadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob);
-
-        GridHadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, gridJob);
-
-        //Prepare input for combine
-        GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.REDUCE, gridJob.id(), 0, 0, null);
-
-        GridHadoopTestTaskContext reduceCtx = new GridHadoopTestTaskContext(taskInfo, gridJob);
-
-        reduceCtx.makeTreeOfWritables(combine1Ctx.mockOutput());
-        reduceCtx.makeTreeOfWritables(combine2Ctx.mockOutput());
-
-        reduceCtx.run();
-
-        reduceCtx.taskInfo(new GridHadoopTaskInfo(GridHadoopTaskType.COMMIT, gridJob.id(), 0, 0, null));
-
-        reduceCtx.run();
-
-        assertEquals(
-            "blue\t200\n" +
-            "green\t150\n" +
-            "red\t100\n" +
-            "yellow\t70\n",
-            readAndSortFile(PATH_OUTPUT + "/" + getOutputFileNamePrefix() + "00000")
-        );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java
deleted file mode 100644
index 15ac125..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
-
-/**
- * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v1.
- */
-public class GridHadoopTasksV1Test extends GridHadoopTasksAllVersionsTest {
-    /**
-     * Creates WordCount hadoop job for API v1.
-     *
-     * @param inFile Input file name for the job.
-     * @param outFile Output file name for the job.
-     * @return Hadoop job.
-     * @throws IOException If fails.
-     */
-    @Override public GridHadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
-        JobConf jobConf = GridHadoopWordCount1.getJob(inFile, outFile);
-
-        setupFileSystems(jobConf);
-
-        GridHadoopDefaultJobInfo jobInfo = createJobInfo(jobConf);
-
-        GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0);
-
-        return new GridHadoopV2Job(jobId, jobInfo, log);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getOutputFileNamePrefix() {
-        return "part-";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java
deleted file mode 100644
index e48eb01..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.*;
-import org.apache.hadoop.mapreduce.lib.output.*;
-import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-import java.util.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
-
-/**
- * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v2.
- */
-public class GridHadoopTasksV2Test extends GridHadoopTasksAllVersionsTest {
-    /**
-     * Creates WordCount hadoop job for API v2.
-     *
-     * @param inFile Input file name for the job.
-     * @param outFile Output file name for the job.
-     * @return Hadoop job.
-     * @throws Exception if fails.
-     */
-    @Override public GridHadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
-        Job job = Job.getInstance();
-
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(IntWritable.class);
-
-        GridHadoopWordCount2.setTasksClasses(job, true, true, true);
-
-        Configuration conf = job.getConfiguration();
-
-        setupFileSystems(conf);
-
-        FileInputFormat.setInputPaths(job, new Path(inFile));
-        FileOutputFormat.setOutputPath(job, new Path(outFile));
-
-        job.setJarByClass(GridHadoopWordCount2.class);
-
-        Job hadoopJob = GridHadoopWordCount2.getJob(inFile, outFile);
-
-        GridHadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration());
-
-        GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0);
-
-        return new GridHadoopV2Job(jobId, jobInfo, log);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getOutputFileNamePrefix() {
-        return "part-r-";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java
deleted file mode 100644
index 5baa8cd..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.processors.hadoop.planner.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Round-robin mr planner.
- */
-public class GridHadoopTestRoundRobinMrPlanner implements GridHadoopMapReducePlanner {
-    /** {@inheritDoc} */
-    @Override public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top,
-        @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException {
-        if (top.isEmpty())
-            throw new IllegalArgumentException("Topology is empty");
-
-        // Has at least one element.
-        Iterator<ClusterNode> it = top.iterator();
-
-        Map<UUID, Collection<GridHadoopInputSplit>> mappers = new HashMap<>();
-
-        for (GridHadoopInputSplit block : job.input()) {
-            ClusterNode node = it.next();
-
-            Collection<GridHadoopInputSplit> nodeBlocks = mappers.get(node.id());
-
-            if (nodeBlocks == null) {
-                nodeBlocks = new ArrayList<>();
-
-                mappers.put(node.id(), nodeBlocks);
-            }
-
-            nodeBlocks.add(block);
-
-            if (!it.hasNext())
-                it = top.iterator();
-        }
-
-        int[] rdc = new int[job.info().reducers()];
-
-        for (int i = 0; i < rdc.length; i++)
-            rdc[i] = i;
-
-        return new GridHadoopDefaultMapReducePlan(mappers, Collections.singletonMap(it.next().id(), rdc));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java
deleted file mode 100644
index 4e0aa9b..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Context for test purpose.
- */
-class GridHadoopTestTaskContext extends GridHadoopV2TaskContext {
-    /**
-     * Simple key-vale pair.
-     * @param <K> Key class.
-     * @param <V> Value class.
-     */
-    public static class Pair<K,V> {
-        /** Key */
-        private K key;
-
-        /** Value */
-        private V val;
-
-        /**
-         * @param key key.
-         * @param val value.
-         */
-        Pair(K key, V val) {
-            this.key = key;
-            this.val = val;
-        }
-
-        /**
-         * Getter of key.
-         * @return key.
-         */
-        K key() {
-            return key;
-        }
-
-        /**
-         * Getter of value.
-         * @return value.
-         */
-        V value() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return key + "," + val;
-        }
-    }
-
-    /** Mock output container- result data of task execution if it is not overridden. */
-    private List<Pair<String, Integer>> mockOutput = new ArrayList<>();
-
-    /** Mock input container- input data if it is not overridden. */
-    private Map<Object,List> mockInput = new TreeMap<>();
-
-    /** Context output implementation to write data into mockOutput. */
-    private GridHadoopTaskOutput output = new GridHadoopTaskOutput() {
-        /** {@inheritDoc} */
-        @Override public void write(Object key, Object val) {
-            //Check of casting and extract/copy values
-            String strKey = new String(((Text)key).getBytes());
-            int intVal = ((IntWritable)val).get();
-
-            mockOutput().add(new Pair<>(strKey, intVal));
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() {
-            throw new UnsupportedOperationException();
-        }
-    };
-
-    /** Context input implementation to read data from mockInput. */
-    private GridHadoopTaskInput input = new GridHadoopTaskInput() {
-        /** Iterator of keys and associated lists of values. */
-        Iterator<Map.Entry<Object, List>> iter;
-
-        /** Current key and associated value list. */
-        Map.Entry<Object, List> currEntry;
-
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            if (iter == null)
-                iter = mockInput().entrySet().iterator();
-
-            if (iter.hasNext())
-                currEntry = iter.next();
-            else
-                currEntry = null;
-
-            return currEntry != null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object key() {
-            return currEntry.getKey();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<?> values() {
-            return currEntry.getValue().iterator() ;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() {
-            throw new UnsupportedOperationException();
-        }
-    };
-
-    /**
-     * Getter of mock output container - result of task if it is not overridden.
-     *
-     * @return mock output.
-     */
-    public List<Pair<String, Integer>> mockOutput() {
-        return mockOutput;
-    }
-
-    /**
-     * Getter of mock input container- input data if it is not overridden.
-     *
-     * @return mock output.
-     */
-    public Map<Object, List> mockInput() {
-        return mockInput;
-    }
-
-    /**
-     * Generate one-key-multiple-values tree from array of key-value pairs, and wrap its into Writable objects.
-     * The result is placed into mock input.
-     *
-     * @param flatData list of key-value pair.
-     */
-    public void makeTreeOfWritables(Iterable<Pair<String, Integer>> flatData) {
-        Text key = new Text();
-
-        for (GridHadoopTestTaskContext.Pair<String, Integer> pair : flatData) {
-            key.set(pair.key);
-            ArrayList<IntWritable> valList;
-
-            if (!mockInput.containsKey(key)) {
-                valList = new ArrayList<>();
-                mockInput.put(key, valList);
-                key = new Text();
-            }
-            else
-                valList = (ArrayList<IntWritable>) mockInput.get(key);
-            valList.add(new IntWritable(pair.value()));
-        }
-    }
-
-    /**
-     * @param taskInfo Task info.
-     * @param gridJob Grid Hadoop job.
-     */
-    public GridHadoopTestTaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob gridJob) throws IgniteCheckedException {
-        super(taskInfo, gridJob, gridJob.id(), null, jobConfDataInput(gridJob));
-    }
-
-    /**
-     * Creates DataInput to read JobConf.
-     *
-     * @param job Job.
-     * @return DataInput with JobConf.
-     * @throws IgniteCheckedException If failed.
-     */
-    private static DataInput jobConfDataInput(GridHadoopJob job) throws IgniteCheckedException {
-        JobConf jobConf = new JobConf();
-
-        for (Map.Entry<String, String> e : ((GridHadoopDefaultJobInfo)job.info()).properties().entrySet())
-            jobConf.set(e.getKey(), e.getValue());
-
-        ByteArrayOutputStream buf = new ByteArrayOutputStream();
-
-        try {
-            jobConf.write(new DataOutputStream(buf));
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-
-        return new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopTaskOutput output() {
-        return output;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopTaskInput input() {
-        return input;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java
deleted file mode 100644
index cdbb809..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.internal.util.typedef.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.junit.Assert.*;
-
-/**
- * Utility class for tests.
- */
-public class GridHadoopTestUtils {
-    /**
-     * Checks that job statistics file contains valid strings only.
-     *
-     * @param reader Buffered reader to get lines of job statistics.
-     * @return Amount of events.
-     * @throws IOException If failed.
-     */
-    public static long simpleCheckJobStatFile(BufferedReader reader) throws IOException {
-        Collection<String> phases = new HashSet<>();
-
-        phases.add("submit");
-        phases.add("prepare");
-        phases.add("start");
-        phases.add("finish");
-        phases.add("requestId");
-        phases.add("responseId");
-
-        Collection<String> evtTypes = new HashSet<>();
-
-        evtTypes.add("JOB");
-        evtTypes.add("SETUP");
-        evtTypes.add("MAP");
-        evtTypes.add("SHUFFLE");
-        evtTypes.add("REDUCE");
-        evtTypes.add("COMBINE");
-        evtTypes.add("COMMIT");
-
-        long evtCnt = 0;
-        String line;
-
-        Map<Long, String> reduceNodes = new HashMap<>();
-
-        while((line = reader.readLine()) != null) {
-            String[] splitLine = line.split(":");
-
-            //Try parse timestamp
-            Long.parseLong(splitLine[1]);
-
-            String[] evt = splitLine[0].split(" ");
-
-            assertTrue("Unknown event '" + evt[0] + "'", evtTypes.contains(evt[0]));
-
-            String phase;
-
-            if ("JOB".equals(evt[0]))
-                phase = evt[1];
-            else {
-                assertEquals(4, evt.length);
-                assertTrue("The node id is not defined", !F.isEmpty(evt[3]));
-
-                long taskNum = Long.parseLong(evt[1]);
-
-                if (("REDUCE".equals(evt[0]) || "SHUFFLE".equals(evt[0]))) {
-                    String nodeId = reduceNodes.get(taskNum);
-
-                    if (nodeId == null)
-                        reduceNodes.put(taskNum, evt[3]);
-                    else
-                        assertEquals("Different nodes for SHUFFLE and REDUCE tasks", nodeId, evt[3]);
-                }
-
-                phase = evt[2];
-            }
-
-            assertTrue("Unknown phase '" + phase + "' in " + Arrays.toString(evt), phases.contains(phase));
-
-            evtCnt++;
-        }
-
-        return evtCnt;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java
deleted file mode 100644
index b201614..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.serializer.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
-
-/**
- * Self test of {@link GridHadoopV2Job}.
- */
-public class GridHadoopV2JobSelfTest extends GridHadoopAbstractSelfTest {
-    /** */
-    private static final String TEST_SERIALIZED_VALUE = "Test serialized value";
-
-    /**
-     * Custom serialization class that accepts {@link Writable}.
-     */
-    private static class CustomSerialization extends WritableSerialization {
-        /** {@inheritDoc} */
-        @Override public Deserializer<Writable> getDeserializer(Class<Writable> c) {
-            return new Deserializer<Writable>() {
-                @Override public void open(InputStream in) { }
-
-                @Override public Writable deserialize(Writable writable) {
-                    return new Text(TEST_SERIALIZED_VALUE);
-                }
-
-                @Override public void close() { }
-            };
-        }
-    }
-
-    /**
-     * Tests that {@link GridHadoopJob} provides wrapped serializer if it's set in configuration.
-     *
-     * @throws IgniteCheckedException If fails.
-     */
-    public void testCustomSerializationApplying() throws IgniteCheckedException {
-        JobConf cfg = new JobConf();
-
-        cfg.setMapOutputKeyClass(IntWritable.class);
-        cfg.setMapOutputValueClass(Text.class);
-        cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
-
-        GridHadoopJob job = new GridHadoopV2Job(new GridHadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log);
-
-        GridHadoopTaskContext taskCtx = job.getTaskContext(new GridHadoopTaskInfo(GridHadoopTaskType.MAP, null, 0, 0,
-            null));
-
-        GridHadoopSerialization ser = taskCtx.keySerialization();
-
-        assertEquals(GridHadoopSerializationWrapper.class.getName(), ser.getClass().getName());
-
-        DataInput in = new DataInputStream(new ByteArrayInputStream(new byte[0]));
-
-        assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString());
-
-        ser = taskCtx.valueSerialization();
-
-        assertEquals(GridHadoopSerializationWrapper.class.getName(), ser.getClass().getName());
-
-        assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopValidationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopValidationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopValidationSelfTest.java
deleted file mode 100644
index 051d073..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopValidationSelfTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.configuration.*;
-
-/**
- * Configuration validation tests.
- */
-public class GridHadoopValidationSelfTest extends GridHadoopAbstractSelfTest {
-    /** Peer class loading enabled flag. */
-    public boolean peerClassLoading;
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids(true);
-
-        peerClassLoading = false;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setPeerClassLoadingEnabled(peerClassLoading);
-
-        return cfg;
-    }
-
-    /**
-     * Ensure that Grid starts when all configuration parameters are valid.
-     *
-     * @throws Exception If failed.
-     */
-    public void testValid() throws Exception {
-        startGrids(1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
new file mode 100644
index 0000000..7fda532
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Abstract class for Hadoop tests.
+ */
+public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** REST port. */
+    protected static final int REST_PORT = 11212;
+
+    /** IGFS name. */
+    protected static final String igfsName = null;
+
+    /** IGFS name. */
+    protected static final String igfsMetaCacheName = "meta";
+
+    /** IGFS name. */
+    protected static final String igfsDataCacheName = "data";
+
+    /** IGFS block size. */
+    protected static final int igfsBlockSize = 1024;
+
+    /** IGFS block group size. */
+    protected static final int igfsBlockGroupSize = 8;
+
+    /** Initial REST port. */
+    private int restPort = REST_PORT;
+
+    /** Initial classpath. */
+    private static String initCp;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        // Add surefire classpath to regular classpath.
+        initCp = System.getProperty("java.class.path");
+
+        String surefireCp = System.getProperty("surefire.test.class.path");
+
+        if (surefireCp != null)
+            System.setProperty("java.class.path", initCp + File.pathSeparatorChar + surefireCp);
+
+        super.beforeTestsStarted();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        // Restore classpath.
+        System.setProperty("java.class.path", initCp);
+
+        initCp = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setHadoopConfiguration(hadoopConfiguration(gridName));
+
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        if (igfsEnabled()) {
+            cfg.setCacheConfiguration(metaCacheConfiguration(), dataCacheConfiguration());
+
+            cfg.setFileSystemConfiguration(igfsConfiguration());
+        }
+
+        if (restEnabled()) {
+            ConnectorConfiguration clnCfg = new ConnectorConfiguration();
+
+            clnCfg.setPort(restPort++);
+
+            cfg.setConnectorConfiguration(clnCfg);
+        }
+
+        cfg.setLocalHost("127.0.0.1");
+        cfg.setPeerClassLoadingEnabled(false);
+
+        return cfg;
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @return Hadoop configuration.
+     */
+    public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = new HadoopConfiguration();
+
+        cfg.setMaxParallelTasks(3);
+
+        return cfg;
+    }
+
+    /**
+     * @return IGFS configuration.
+     */
+    public FileSystemConfiguration igfsConfiguration() {
+        FileSystemConfiguration cfg = new FileSystemConfiguration();
+
+        cfg.setName(igfsName);
+        cfg.setBlockSize(igfsBlockSize);
+        cfg.setDataCacheName(igfsDataCacheName);
+        cfg.setMetaCacheName(igfsMetaCacheName);
+        cfg.setFragmentizerEnabled(false);
+
+        return cfg;
+    }
+
+    /**
+     * @return IGFS meta cache configuration.
+     */
+    public CacheConfiguration metaCacheConfiguration() {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setName(igfsMetaCacheName);
+        cfg.setCacheMode(REPLICATED);
+        cfg.setAtomicityMode(TRANSACTIONAL);
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return cfg;
+    }
+
+    /**
+     * @return IGFS data cache configuration.
+     */
+    private CacheConfiguration dataCacheConfiguration() {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setName(igfsDataCacheName);
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setAtomicityMode(TRANSACTIONAL);
+        cfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(igfsBlockGroupSize));
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return cfg;
+    }
+
+    /**
+     * @return {@code True} if IGFS is enabled on Hadoop nodes.
+     */
+    protected boolean igfsEnabled() {
+        return false;
+    }
+
+    /**
+     * @return {@code True} if REST is enabled on Hadoop nodes.
+     */
+    protected boolean restEnabled() {
+        return false;
+    }
+
+    /**
+     * @return Number of nodes to start.
+     */
+    protected int gridCount() {
+        return 3;
+    }
+
+    /**
+     * @param cfg Config.
+     */
+    protected void setupFileSystems(Configuration cfg) {
+        cfg.set("fs.defaultFS", igfsScheme());
+        cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName());
+        cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem.
+            class.getName());
+
+        HadoopFileSystemsUtils.setupFileSystems(cfg);
+    }
+
+    /**
+     * @return IGFS scheme for test.
+     */
+    protected String igfsScheme() {
+        return "igfs://:" + getTestGridName(0) + "@/";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
new file mode 100644
index 0000000..1390982
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import com.google.common.base.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Abstract class for tests based on WordCount test job.
+ */
+public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest {
+    /** Input path. */
+    protected static final String PATH_INPUT = "/input";
+
+    /** Output path. */
+    protected static final String PATH_OUTPUT = "/output";
+
+    /** IGFS instance. */
+    protected IgfsEx igfs;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        Configuration cfg = new Configuration();
+
+        setupFileSystems(cfg);
+
+        // Init cache by correct LocalFileSystem implementation
+        FileSystem.getLocal(cfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        igfs = (IgfsEx)startGrids(gridCount()).fileSystem(igfsName);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /**
+     * Generates test file.
+     *
+     * @param path File name.
+     * @param wordCounts Words and counts.
+     * @throws Exception If failed.
+     */
+    protected void generateTestFile(String path, Object... wordCounts) throws Exception {
+        List<String> wordsArr = new ArrayList<>();
+
+        //Generating
+        for (int i = 0; i < wordCounts.length; i += 2) {
+            String word = (String) wordCounts[i];
+            int cnt = (Integer) wordCounts[i + 1];
+
+            while (cnt-- > 0)
+                wordsArr.add(word);
+        }
+
+        //Shuffling
+        for (int i = 0; i < wordsArr.size(); i++) {
+            int j = (int)(Math.random() * wordsArr.size());
+
+            Collections.swap(wordsArr, i, j);
+        }
+
+        //Input file preparing
+        PrintWriter testInputFileWriter = new PrintWriter(igfs.create(new IgfsPath(path), true));
+
+        int j = 0;
+
+        while (j < wordsArr.size()) {
+            int i = 5 + (int)(Math.random() * 5);
+
+            List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size()));
+            j += i;
+
+            testInputFileWriter.println(Joiner.on(' ').join(subList));
+        }
+
+        testInputFileWriter.close();
+    }
+
+    /**
+     * Reads whole text file into String.
+     *
+     * @param fileName Name of the file to read.
+     * @return Content of the file as String value.
+     * @throws Exception If could not read the file.
+     */
+    protected String readAndSortFile(String fileName) throws Exception {
+        BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath(fileName))));
+
+        List<String> list = new ArrayList<>();
+
+        String line;
+
+        while ((line = reader.readLine()) != null)
+            list.add(line);
+
+        Collections.sort(list);
+
+        return Joiner.on('\n').join(list) + "\n";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
new file mode 100644
index 0000000..a3289cb
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import junit.framework.*;
+import org.apache.hadoop.mapreduce.*;
+
+/**
+ *
+ */
+public class HadoopClassLoaderTest extends TestCase {
+    /** */
+    HadoopClassLoader ldr = new HadoopClassLoader(null);
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClassLoading() throws Exception {
+        assertNotSame(Test1.class, ldr.loadClass(Test1.class.getName()));
+        assertNotSame(Test2.class, ldr.loadClass(Test2.class.getName()));
+        assertSame(Test3.class, ldr.loadClass(Test3.class.getName()));
+    }
+
+//    public void testDependencySearch() {
+//        assertTrue(ldr.hasExternalDependencies(Test1.class.getName(), new HashSet<String>()));
+//        assertTrue(ldr.hasExternalDependencies(Test2.class.getName(), new HashSet<String>()));
+//    }
+
+    /**
+     *
+     */
+    private static class Test1 {
+        /** */
+        Test2 t2;
+
+        /** */
+        Job[][] jobs = new Job[4][4];
+    }
+
+    /**
+     *
+     */
+    private static abstract class Test2 {
+        /** */
+        abstract Test1 t1();
+    }
+
+    /**
+     *
+     */
+    private static class Test3 {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
new file mode 100644
index 0000000..33fa358
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import com.google.common.base.*;
+import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jdk8.backport.*;
+
+import java.io.*;
+import java.nio.file.*;
+import java.util.*;
+
+/**
+ * Test of integration with Hadoop client via command line interface.
+ */
+public class HadoopCommandLineTest extends GridCommonAbstractTest {
+    /** IGFS instance. */
+    private IgfsEx igfs;
+
+    /** */
+    private static final String igfsName = "igfs";
+
+    /** */
+    private static File testWorkDir;
+
+    /** */
+    private static String hadoopHome;
+
+    /** */
+    private static String hiveHome;
+
+    /** */
+    private static File examplesJar;
+
+    /**
+     *
+     * @param path File name.
+     * @param wordCounts Words and counts.
+     * @throws Exception If failed.
+     */
+    private void generateTestFile(File path, Object... wordCounts) throws Exception {
+        List<String> wordsArr = new ArrayList<>();
+
+        //Generating
+        for (int i = 0; i < wordCounts.length; i += 2) {
+            String word = (String) wordCounts[i];
+            int cnt = (Integer) wordCounts[i + 1];
+
+            while (cnt-- > 0)
+                wordsArr.add(word);
+        }
+
+        //Shuffling
+        for (int i = 0; i < wordsArr.size(); i++) {
+            int j = (int)(Math.random() * wordsArr.size());
+
+            Collections.swap(wordsArr, i, j);
+        }
+
+        //Writing file
+        try (PrintWriter writer = new PrintWriter(path)) {
+            int j = 0;
+
+            while (j < wordsArr.size()) {
+                int i = 5 + (int)(Math.random() * 5);
+
+                List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size()));
+                j += i;
+
+                writer.println(Joiner.on(' ').join(subList));
+            }
+
+            writer.flush();
+        }
+    }
+
+    /**
+     * Generates two data files to join its with Hive.
+     *
+     * @throws FileNotFoundException If failed.
+     */
+    private void generateHiveTestFiles() throws FileNotFoundException {
+        try (PrintWriter writerA = new PrintWriter(new File(testWorkDir, "data-a"));
+             PrintWriter writerB = new PrintWriter(new File(testWorkDir, "data-b"))) {
+            char sep = '\t';
+
+            int idB = 0;
+            int idA = 0;
+            int v = 1000;
+
+            for (int i = 0; i < 1000; i++) {
+                writerA.print(idA++);
+                writerA.print(sep);
+                writerA.println(idB);
+
+                writerB.print(idB++);
+                writerB.print(sep);
+                writerB.println(v += 2);
+
+                writerB.print(idB++);
+                writerB.print(sep);
+                writerB.println(v += 2);
+            }
+
+            writerA.flush();
+            writerB.flush();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        hiveHome = IgniteSystemProperties.getString("HIVE_HOME");
+
+        assertFalse("HIVE_HOME hasn't been set.", F.isEmpty(hiveHome));
+
+        hadoopHome = IgniteSystemProperties.getString("HADOOP_HOME");
+
+        assertFalse("HADOOP_HOME hasn't been set.", F.isEmpty(hadoopHome));
+
+        String mapredHome = hadoopHome + "/share/hadoop/mapreduce";
+
+        File[] fileList = new File(mapredHome).listFiles(new FileFilter() {
+            @Override public boolean accept(File pathname) {
+                return pathname.getName().startsWith("hadoop-mapreduce-examples-") &&
+                    pathname.getName().endsWith(".jar");
+            }
+        });
+
+        assertEquals("Invalid hadoop distribution.", 1, fileList.length);
+
+        examplesJar = fileList[0];
+
+        testWorkDir = Files.createTempDirectory("hadoop-cli-test").toFile();
+
+        U.copy(U.resolveIgnitePath("docs/core-site.ignite.xml"), new File(testWorkDir, "core-site.xml"), false);
+
+        File srcFile = U.resolveIgnitePath("docs/mapred-site.ignite.xml");
+        File dstFile = new File(testWorkDir, "mapred-site.xml");
+
+        try (BufferedReader in = new BufferedReader(new FileReader(srcFile));
+             PrintWriter out = new PrintWriter(dstFile)) {
+            String line;
+
+            while ((line = in.readLine()) != null) {
+                if (line.startsWith("</configuration>"))
+                    out.println(
+                        "    <property>\n" +
+                        "        <name>" + HadoopUtils.JOB_COUNTER_WRITER_PROPERTY + "</name>\n" +
+                        "        <value>" + IgniteHadoopFileSystemCounterWriter.class.getName() + "</value>\n" +
+                        "    </property>\n");
+
+                out.println(line);
+            }
+
+            out.flush();
+        }
+
+        generateTestFile(new File(testWorkDir, "test-data"), "red", 100, "green", 200, "blue", 150, "yellow", 50);
+
+        generateHiveTestFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        U.delete(testWorkDir);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /**
+     * Creates the process build with appropriate environment to run Hadoop CLI.
+     *
+     * @return Process builder.
+     */
+    private ProcessBuilder createProcessBuilder() {
+        String sep = ":";
+
+        String ggClsPath = HadoopJob.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep +
+            HadoopJobTracker.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep +
+            ConcurrentHashMap8.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+
+        ProcessBuilder res = new ProcessBuilder();
+
+        res.environment().put("HADOOP_HOME", hadoopHome);
+        res.environment().put("HADOOP_CLASSPATH", ggClsPath);
+        res.environment().put("HADOOP_CONF_DIR", testWorkDir.getAbsolutePath());
+
+        res.redirectErrorStream(true);
+
+        return res;
+    }
+
+    /**
+     * Waits for process exit and prints the its output.
+     *
+     * @param proc Process.
+     * @return Exit code.
+     * @throws Exception If failed.
+     */
+    private int watchProcess(Process proc) throws Exception {
+        BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+        String line;
+
+        while ((line = reader.readLine()) != null)
+            log().info(line);
+
+        return proc.waitFor();
+    }
+
+    /**
+     * Executes Hadoop command line tool.
+     *
+     * @param args Arguments for Hadoop command line tool.
+     * @return Process exit code.
+     * @throws Exception If failed.
+     */
+    private int executeHadoopCmd(String... args) throws Exception {
+        ProcessBuilder procBuilder = createProcessBuilder();
+
+        List<String> cmd = new ArrayList<>();
+
+        cmd.add(hadoopHome + "/bin/hadoop");
+        cmd.addAll(Arrays.asList(args));
+
+        procBuilder.command(cmd);
+
+        log().info("Execute: " + procBuilder.command());
+
+        return watchProcess(procBuilder.start());
+    }
+
+    /**
+     * Executes Hive query.
+     *
+     * @param qry Query.
+     * @return Process exit code.
+     * @throws Exception If failed.
+     */
+    private int executeHiveQuery(String qry) throws Exception {
+        ProcessBuilder procBuilder = createProcessBuilder();
+
+        List<String> cmd = new ArrayList<>();
+
+        procBuilder.command(cmd);
+
+        cmd.add(hiveHome + "/bin/hive");
+
+        cmd.add("--hiveconf");
+        cmd.add("hive.rpc.query.plan=true");
+
+        cmd.add("--hiveconf");
+        cmd.add("javax.jdo.option.ConnectionURL=jdbc:derby:" + testWorkDir.getAbsolutePath() + "/metastore_db;" +
+            "databaseName=metastore_db;create=true");
+
+        cmd.add("-e");
+        cmd.add(qry);
+
+        procBuilder.command(cmd);
+
+        log().info("Execute: " + procBuilder.command());
+
+        return watchProcess(procBuilder.start());
+    }
+
+    /**
+     * Tests Hadoop command line integration.
+     */
+    public void testHadoopCommandLine() throws Exception {
+        assertEquals(0, executeHadoopCmd("fs", "-ls", "/"));
+
+        assertEquals(0, executeHadoopCmd("fs", "-mkdir", "/input"));
+
+        assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "test-data").getAbsolutePath(), "/input"));
+
+        assertTrue(igfs.exists(new IgfsPath("/input/test-data")));
+
+        assertEquals(0, executeHadoopCmd("jar", examplesJar.getAbsolutePath(), "wordcount", "/input", "/output"));
+
+        IgfsPath path = new IgfsPath("/user/" + System.getProperty("user.name") + "/");
+
+        assertTrue(igfs.exists(path));
+
+        IgfsPath jobStatPath = null;
+
+        for (IgfsPath jobPath : igfs.listPaths(path)) {
+            assertNull(jobStatPath);
+
+            jobStatPath = jobPath;
+        }
+
+        File locStatFile = new File(testWorkDir, "performance");
+
+        assertEquals(0, executeHadoopCmd("fs", "-get", jobStatPath.toString() + "/performance", locStatFile.toString()));
+
+        long evtCnt = HadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new FileReader(locStatFile)));
+
+        assertTrue(evtCnt >= 22); //It's the minimum amount of events for job with combiner.
+
+        assertTrue(igfs.exists(new IgfsPath("/output")));
+
+        BufferedReader in = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath("/output/part-r-00000"))));
+
+        List<String> res = new ArrayList<>();
+
+        String line;
+
+        while ((line = in.readLine()) != null)
+            res.add(line);
+
+        Collections.sort(res);
+
+        assertEquals("[blue\t150, green\t200, red\t100, yellow\t50]", res.toString());
+    }
+
+    /**
+     * Runs query check result.
+     *
+     * @param expRes Expected result.
+     * @param qry Query.
+     * @throws Exception If failed.
+     */
+    private void checkQuery(String expRes, String qry) throws Exception {
+        assertEquals(0, executeHiveQuery("drop table if exists result"));
+
+        assertEquals(0, executeHiveQuery(
+            "create table result " +
+            "row format delimited fields terminated by ' ' " +
+            "stored as textfile " +
+            "location '/result' as " + qry
+        ));
+
+        IgfsInputStreamAdapter in = igfs.open(new IgfsPath("/result/000000_0"));
+
+        byte[] buf = new byte[(int) in.length()];
+
+        in.read(buf);
+
+        assertEquals(expRes, new String(buf));
+    }
+
+    /**
+     * Tests Hive integration.
+     */
+    public void testHiveCommandLine() throws Exception {
+        assertEquals(0, executeHiveQuery(
+            "create table table_a (" +
+                "id_a int," +
+                "id_b int" +
+            ") " +
+            "row format delimited fields terminated by '\\t'" +
+            "stored as textfile " +
+            "location '/table-a'"
+        ));
+
+        assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-a").getAbsolutePath(), "/table-a"));
+
+        assertEquals(0, executeHiveQuery(
+            "create table table_b (" +
+                "id_b int," +
+                "rndv int" +
+            ") " +
+            "row format delimited fields terminated by '\\t'" +
+            "stored as textfile " +
+            "location '/table-b'"
+        ));
+
+        assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-b").getAbsolutePath(), "/table-b"));
+
+        checkQuery(
+            "0 0\n" +
+            "1 2\n" +
+            "2 4\n" +
+            "3 6\n" +
+            "4 8\n" +
+            "5 10\n" +
+            "6 12\n" +
+            "7 14\n" +
+            "8 16\n" +
+            "9 18\n",
+            "select * from table_a order by id_a limit 10"
+        );
+
+        checkQuery("2000\n", "select count(id_b) from table_b");
+
+        checkQuery(
+            "250 500 2002\n" +
+            "251 502 2006\n" +
+            "252 504 2010\n" +
+            "253 506 2014\n" +
+            "254 508 2018\n" +
+            "255 510 2022\n" +
+            "256 512 2026\n" +
+            "257 514 2030\n" +
+            "258 516 2034\n" +
+            "259 518 2038\n",
+            "select a.id_a, a.id_b, b.rndv" +
+            " from table_a a" +
+            " inner join table_b b on a.id_b = b.id_b" +
+            " where b.rndv > 2000" +
+            " order by a.id_a limit 10"
+        );
+
+        checkQuery("1000\n", "select count(b.id_b) from table_a a inner join table_b b on a.id_b = b.id_b");
+    }
+}


Mime
View raw message