ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [22/31] incubator-ignite git commit: # IGNITE-386: WIP on internal namings (3).
Date Tue, 03 Mar 2015 13:08:38 GMT
# IGNITE-386: WIP on internal namings (3).


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1c4b00d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1c4b00d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1c4b00d4

Branch: refs/heads/ignite-386
Commit: 1c4b00d4131fef7e05a661516b2b2f8fd480bb2e
Parents: 288709a
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Mar 3 16:01:21 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Mar 3 16:01:21 2015 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopClassLoader.java    |    4 +-
 .../processors/hadoop/HadoopCounters.java       |    4 +-
 .../processors/hadoop/HadoopDefaultJobInfo.java |    2 +-
 .../internal/processors/hadoop/HadoopUtils.java |    6 +-
 .../taskexecutor/GridHadoopRunnableTask.java    |  268 ----
 .../HadoopEmbeddedTaskExecutor.java             |   16 +-
 .../taskexecutor/HadoopExecutorService.java     |    4 +-
 .../hadoop/taskexecutor/HadoopRunnableTask.java |  268 ++++
 .../external/HadoopExternalTaskExecutor.java    |    8 +-
 .../child/GridHadoopChildProcessRunner.java     |  440 ------
 .../child/GridHadoopExternalProcessStarter.java |  296 ----
 .../child/HadoopChildProcessRunner.java         |  440 ++++++
 .../child/HadoopExternalProcessStarter.java     |  296 ++++
 .../GridHadoopAbstractCommunicationClient.java  |   96 --
 .../GridHadoopCommunicationClient.java          |   72 -
 .../GridHadoopExternalCommunication.java        | 1431 ------------------
 .../GridHadoopHandshakeTimeoutException.java    |   42 -
 .../GridHadoopIpcToNioAdapter.java              |  239 ---
 .../GridHadoopMarshallerFilter.java             |   84 -
 .../GridHadoopMessageListener.java              |   39 -
 .../GridHadoopTcpNioCommunicationClient.java    |   99 --
 .../HadoopAbstractCommunicationClient.java      |   96 ++
 .../HadoopCommunicationClient.java              |   72 +
 .../HadoopExternalCommunication.java            | 1431 ++++++++++++++++++
 .../HadoopHandshakeTimeoutException.java        |   42 +
 .../communication/HadoopIpcToNioAdapter.java    |  239 +++
 .../communication/HadoopMarshallerFilter.java   |   84 +
 .../communication/HadoopMessageListener.java    |   39 +
 .../HadoopTcpNioCommunicationClient.java        |   99 ++
 .../hadoop/v1/GridHadoopV1CleanupTask.java      |   62 -
 .../hadoop/v1/GridHadoopV1Counter.java          |  105 --
 .../hadoop/v1/GridHadoopV1MapTask.java          |  111 --
 .../hadoop/v1/GridHadoopV1OutputCollector.java  |  130 --
 .../hadoop/v1/GridHadoopV1Partitioner.java      |   44 -
 .../hadoop/v1/GridHadoopV1ReduceTask.java       |   92 --
 .../hadoop/v1/GridHadoopV1Reporter.java         |   79 -
 .../hadoop/v1/GridHadoopV1SetupTask.java        |   56 -
 .../hadoop/v1/GridHadoopV1Splitter.java         |   97 --
 .../processors/hadoop/v1/GridHadoopV1Task.java  |   95 --
 .../hadoop/v1/HadoopV1CleanupTask.java          |   62 +
 .../processors/hadoop/v1/HadoopV1Counter.java   |  105 ++
 .../processors/hadoop/v1/HadoopV1MapTask.java   |  111 ++
 .../hadoop/v1/HadoopV1OutputCollector.java      |  130 ++
 .../hadoop/v1/HadoopV1Partitioner.java          |   44 +
 .../hadoop/v1/HadoopV1ReduceTask.java           |   92 ++
 .../processors/hadoop/v1/HadoopV1Reporter.java  |   79 +
 .../processors/hadoop/v1/HadoopV1SetupTask.java |   56 +
 .../processors/hadoop/v1/HadoopV1Splitter.java  |   97 ++
 .../processors/hadoop/v1/HadoopV1Task.java      |   95 ++
 .../hadoop/v2/GridHadoopExternalSplit.java      |   87 --
 .../hadoop/v2/GridHadoopNativeCodeLoader.java   |   74 -
 .../v2/GridHadoopSerializationWrapper.java      |  133 --
 .../v2/GridHadoopShutdownHookManager.java       |   96 --
 .../hadoop/v2/GridHadoopSplitWrapper.java       |  118 --
 .../hadoop/v2/GridHadoopV2CleanupTask.java      |   73 -
 .../hadoop/v2/GridHadoopV2Context.java          |  230 ---
 .../hadoop/v2/GridHadoopV2Counter.java          |   87 --
 .../processors/hadoop/v2/GridHadoopV2Job.java   |  280 ----
 .../v2/GridHadoopV2JobResourceManager.java      |  305 ----
 .../hadoop/v2/GridHadoopV2MapTask.java          |  109 --
 .../hadoop/v2/GridHadoopV2Partitioner.java      |   44 -
 .../hadoop/v2/GridHadoopV2ReduceTask.java       |   88 --
 .../hadoop/v2/GridHadoopV2SetupTask.java        |   66 -
 .../hadoop/v2/GridHadoopV2Splitter.java         |  105 --
 .../processors/hadoop/v2/GridHadoopV2Task.java  |  181 ---
 .../hadoop/v2/GridHadoopV2TaskContext.java      |  443 ------
 .../v2/GridHadoopWritableSerialization.java     |   74 -
 .../hadoop/v2/HadoopExternalSplit.java          |   87 ++
 .../hadoop/v2/HadoopNativeCodeLoader.java       |   74 +
 .../hadoop/v2/HadoopSerializationWrapper.java   |  133 ++
 .../hadoop/v2/HadoopShutdownHookManager.java    |   96 ++
 .../hadoop/v2/HadoopSplitWrapper.java           |  118 ++
 .../hadoop/v2/HadoopV2CleanupTask.java          |   73 +
 .../processors/hadoop/v2/HadoopV2Context.java   |  230 +++
 .../processors/hadoop/v2/HadoopV2Counter.java   |   87 ++
 .../processors/hadoop/v2/HadoopV2Job.java       |  280 ++++
 .../hadoop/v2/HadoopV2JobResourceManager.java   |  305 ++++
 .../processors/hadoop/v2/HadoopV2MapTask.java   |  109 ++
 .../hadoop/v2/HadoopV2Partitioner.java          |   44 +
 .../hadoop/v2/HadoopV2ReduceTask.java           |   88 ++
 .../processors/hadoop/v2/HadoopV2SetupTask.java |   66 +
 .../processors/hadoop/v2/HadoopV2Splitter.java  |  105 ++
 .../processors/hadoop/v2/HadoopV2Task.java      |  181 +++
 .../hadoop/v2/HadoopV2TaskContext.java          |  443 ++++++
 .../hadoop/v2/HadoopWritableSerialization.java  |   74 +
 .../GridHadoopSerializationWrapperSelfTest.java |   74 -
 .../hadoop/GridHadoopSplitWrapperSelfTest.java  |   68 -
 .../hadoop/GridHadoopTasksAllVersionsTest.java  |   30 +-
 .../hadoop/GridHadoopTasksV1Test.java           |    4 +-
 .../hadoop/GridHadoopTasksV2Test.java           |    4 +-
 .../hadoop/GridHadoopTestTaskContext.java       |  219 ---
 .../hadoop/GridHadoopV2JobSelfTest.java         |   88 --
 .../HadoopSerializationWrapperSelfTest.java     |   74 +
 .../hadoop/HadoopSplitWrapperSelfTest.java      |   68 +
 .../hadoop/HadoopTestTaskContext.java           |  219 +++
 .../processors/hadoop/HadoopV2JobSelfTest.java  |   88 ++
 .../collections/GridHadoopAbstractMapTest.java  |    4 +-
 ...GridHadoopExternalCommunicationSelfTest.java |  209 ---
 .../HadoopExternalCommunicationSelfTest.java    |  209 +++
 .../testsuites/IgniteHadoopTestSuite.java       |    8 +-
 100 files changed, 7275 insertions(+), 7275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
index 6915d17..2f484d8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -93,9 +93,9 @@ public class HadoopClassLoader extends URLClassLoader {
         try {
             if (isHadoop(name)) { // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath.
                 if (name.endsWith(".util.ShutdownHookManager"))  // Dirty hack to get rid of Hadoop shutdown hooks.
-                    return loadFromBytes(name, GridHadoopShutdownHookManager.class.getName());
+                    return loadFromBytes(name, HadoopShutdownHookManager.class.getName());
                 else if (name.endsWith(".util.NativeCodeLoader"))
-                    return loadFromBytes(name, GridHadoopNativeCodeLoader.class.getName());
+                    return loadFromBytes(name, HadoopNativeCodeLoader.class.getName());
 
                 return loadClassExplicitly(name, resolve);
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
index ad699ec..3482640 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
@@ -186,7 +186,7 @@ public class HadoopCounters extends Counters {
 
         for (HadoopLongCounter counter : cntrs.values()) {
             if (grpName.equals(counter.group()))
-                grpCounters.add(new GridHadoopV2Counter(counter));
+                grpCounters.add(new HadoopV2Counter(counter));
         }
 
         return grpCounters.iterator();
@@ -211,6 +211,6 @@ public class HadoopCounters extends Counters {
             cntrs.put(key, new HadoopLongCounter(grpName,cntrName));
         }
 
-        return internalCntr == null ? null : new GridHadoopV2Counter(internalCntr);
+        return internalCntr == null ? null : new HadoopV2Counter(internalCntr);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index 370b82d..2f44778 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -91,7 +91,7 @@ public class HadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable {
                     if ((jobCls0 = jobCls) == null) {
                         HadoopClassLoader ldr = new HadoopClassLoader(null);
 
-                        jobCls = jobCls0 = ldr.loadClass(GridHadoopV2Job.class.getName());
+                        jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName());
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 46594ce..62b5a98 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -65,7 +65,7 @@ public class HadoopUtils {
      * @param hosts Hosts.
      * @throws IOException If failed.
      */
-    public static GridHadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException {
+    public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException {
         ByteArrayOutputStream arr = new ByteArrayOutputStream();
         ObjectOutput out = new ObjectOutputStream(arr);
 
@@ -75,7 +75,7 @@ public class HadoopUtils {
 
         out.flush();
 
-        return new GridHadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts);
+        return new HadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts);
     }
 
     /**
@@ -84,7 +84,7 @@ public class HadoopUtils {
      * @param o Wrapper.
      * @return Split.
      */
-    public static Object unwrapSplit(GridHadoopSplitWrapper o) {
+    public static Object unwrapSplit(HadoopSplitWrapper o) {
         try {
             Writable w = (Writable)HadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java
deleted file mode 100644
index 1ce7d4a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*;
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*;
-
-/**
- * Runnable task.
- */
-public abstract class GridHadoopRunnableTask implements Callable<Void> {
-    /** */
-    private final GridUnsafeMemory mem;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private final GridHadoopJob job;
-
-    /** Task to run. */
-    private final GridHadoopTaskInfo info;
-
-    /** Submit time. */
-    private final long submitTs = U.currentTimeMillis();
-
-    /** Execution start timestamp. */
-    private long execStartTs;
-
-    /** Execution end timestamp. */
-    private long execEndTs;
-
-    /** */
-    private HadoopMultimap combinerInput;
-
-    /** */
-    private volatile GridHadoopTaskContext ctx;
-
-    /** Set if task is to cancelling. */
-    private volatile boolean cancelled;
-
-    /** Node id. */
-    private UUID nodeId;
-
-    /**
-     * @param log Log.
-     * @param job Job.
-     * @param mem Memory.
-     * @param info Task info.
-     * @param nodeId Node id.
-     */
-    protected GridHadoopRunnableTask(IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem, GridHadoopTaskInfo info,
-        UUID nodeId) {
-        this.nodeId = nodeId;
-        this.log = log.getLogger(GridHadoopRunnableTask.class);
-        this.job = job;
-        this.mem = mem;
-        this.info = info;
-    }
-
-    /**
-     * @return Wait time.
-     */
-    public long waitTime() {
-        return execStartTs - submitTs;
-    }
-
-    /**
-     * @return Execution time.
-     */
-    public long executionTime() {
-        return execEndTs - execStartTs;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Void call() throws IgniteCheckedException {
-        execStartTs = U.currentTimeMillis();
-
-        Throwable err = null;
-
-        HadoopTaskState state = HadoopTaskState.COMPLETED;
-
-        HadoopPerformanceCounter perfCntr = null;
-
-        try {
-            ctx = job.getTaskContext(info);
-
-            perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
-
-            perfCntr.onTaskSubmit(info, submitTs);
-            perfCntr.onTaskPrepare(info, execStartTs);
-
-            ctx.prepareTaskEnvironment();
-
-            runTask(perfCntr);
-
-            if (info.type() == MAP && job.info().hasCombiner()) {
-                ctx.taskInfo(new GridHadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null));
-
-                try {
-                    runTask(perfCntr);
-                }
-                finally {
-                    ctx.taskInfo(info);
-                }
-            }
-        }
-        catch (HadoopTaskCancelledException ignored) {
-            state = HadoopTaskState.CANCELED;
-        }
-        catch (Throwable e) {
-            state = HadoopTaskState.FAILED;
-            err = e;
-
-            U.error(log, "Task execution failed.", e);
-        }
-        finally {
-            execEndTs = U.currentTimeMillis();
-
-            if (perfCntr != null)
-                perfCntr.onTaskFinish(info, execEndTs);
-
-            onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : ctx.counters()));
-
-            if (combinerInput != null)
-                combinerInput.close();
-
-            if (ctx != null)
-                ctx.cleanupTaskEnvironment();
-        }
-
-        return null;
-    }
-
-    /**
-     * @param perfCntr Performance counter.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void runTask(HadoopPerformanceCounter perfCntr) throws IgniteCheckedException {
-        if (cancelled)
-            throw new HadoopTaskCancelledException("Task cancelled.");
-
-        try (GridHadoopTaskOutput out = createOutputInternal(ctx);
-             GridHadoopTaskInput in = createInputInternal(ctx)) {
-
-            ctx.input(in);
-            ctx.output(out);
-
-            perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis());
-
-            ctx.run();
-        }
-    }
-
-    /**
-     * Cancel the executed task.
-     */
-    public void cancel() {
-        cancelled = true;
-
-        if (ctx != null)
-            ctx.cancel();
-    }
-
-    /**
-     * @param status Task status.
-     */
-    protected abstract void onTaskFinished(HadoopTaskStatus status);
-
-    /**
-     * @param ctx Task context.
-     * @return Task input.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    private GridHadoopTaskInput createInputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException {
-        switch (ctx.taskInfo().type()) {
-            case SETUP:
-            case MAP:
-            case COMMIT:
-            case ABORT:
-                return null;
-
-            case COMBINE:
-                assert combinerInput != null;
-
-                return combinerInput.input(ctx);
-
-            default:
-                return createInput(ctx);
-        }
-    }
-
-    /**
-     * @param ctx Task context.
-     * @return Input.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected abstract GridHadoopTaskInput createInput(GridHadoopTaskContext ctx) throws IgniteCheckedException;
-
-    /**
-     * @param ctx Task info.
-     * @return Output.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected abstract GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx) throws IgniteCheckedException;
-
-    /**
-     * @param ctx Task info.
-     * @return Task output.
-     * @throws IgniteCheckedException If failed.
-     */
-    private GridHadoopTaskOutput createOutputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException {
-        switch (ctx.taskInfo().type()) {
-            case SETUP:
-            case REDUCE:
-            case COMMIT:
-            case ABORT:
-                return null;
-
-            case MAP:
-                if (job.info().hasCombiner()) {
-                    assert combinerInput == null;
-
-                    combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ?
-                        new HadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)):
-                        new HadoopSkipList(job.info(), mem); // TODO replace with red-black tree
-
-                    return combinerInput.startAdding(ctx);
-                }
-
-            default:
-                return createOutput(ctx);
-        }
-    }
-
-    /**
-     * @return Task info.
-     */
-    public GridHadoopTaskInfo taskInfo() {
-        return info;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
index 934ff35..e217c57 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
@@ -35,7 +35,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
     private HadoopJobTracker jobTracker;
 
     /** */
-    private final ConcurrentMap<GridHadoopJobId, Collection<GridHadoopRunnableTask>> jobs = new ConcurrentHashMap<>();
+    private final ConcurrentMap<GridHadoopJobId, Collection<HadoopRunnableTask>> jobs = new ConcurrentHashMap<>();
 
     /** Executor service to run tasks. */
     private HadoopExecutorService exec;
@@ -74,22 +74,22 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
             log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() +
                 ", tasksCnt=" + tasks.size() + ']');
 
-        Collection<GridHadoopRunnableTask> executedTasks = jobs.get(job.id());
+        Collection<HadoopRunnableTask> executedTasks = jobs.get(job.id());
 
         if (executedTasks == null) {
             executedTasks = new GridConcurrentHashSet<>();
 
-            Collection<GridHadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks);
+            Collection<HadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks);
 
             assert extractedCol == null;
         }
 
-        final Collection<GridHadoopRunnableTask> finalExecutedTasks = executedTasks;
+        final Collection<HadoopRunnableTask> finalExecutedTasks = executedTasks;
 
         for (final GridHadoopTaskInfo info : tasks) {
             assert info != null;
 
-            GridHadoopRunnableTask task = new GridHadoopRunnableTask(log, job, ctx.shuffle().memory(), info,
+            HadoopRunnableTask task = new HadoopRunnableTask(log, job, ctx.shuffle().memory(), info,
                 ctx.localNodeId()) {
                 @Override protected void onTaskFinished(HadoopTaskStatus status) {
                     if (log.isDebugEnabled())
@@ -127,10 +127,10 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
      * @param jobId Job ID to cancel.
      */
     @Override public void cancelTasks(GridHadoopJobId jobId) {
-        Collection<GridHadoopRunnableTask> executedTasks = jobs.get(jobId);
+        Collection<HadoopRunnableTask> executedTasks = jobs.get(jobId);
 
         if (executedTasks != null) {
-            for (GridHadoopRunnableTask task : executedTasks)
+            for (HadoopRunnableTask task : executedTasks)
                 task.cancel();
         }
     }
@@ -138,7 +138,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
     /** {@inheritDoc} */
     @Override public void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException {
         if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) {
-            Collection<GridHadoopRunnableTask> executedTasks = jobs.remove(meta.jobId());
+            Collection<HadoopRunnableTask> executedTasks = jobs.remove(meta.jobId());
 
             assert executedTasks == null || executedTasks.isEmpty();
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
index 19f903f..d89d7d0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
@@ -169,8 +169,8 @@ public class HadoopExecutorService {
     private void startThread(final Callable<?> task) {
         String workerName;
 
-        if (task instanceof GridHadoopRunnableTask) {
-            final GridHadoopTaskInfo i = ((GridHadoopRunnableTask)task).taskInfo();
+        if (task instanceof HadoopRunnableTask) {
+            final GridHadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo();
 
             workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt();
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
new file mode 100644
index 0000000..5b10d6f
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*;
+import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*;
+
+/**
+ * Runnable task.
+ */
+public abstract class HadoopRunnableTask implements Callable<Void> {
+    /** */
+    private final GridUnsafeMemory mem;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final GridHadoopJob job;
+
+    /** Task to run. */
+    private final GridHadoopTaskInfo info;
+
+    /** Submit time. */
+    private final long submitTs = U.currentTimeMillis();
+
+    /** Execution start timestamp. */
+    private long execStartTs;
+
+    /** Execution end timestamp. */
+    private long execEndTs;
+
+    /** */
+    private HadoopMultimap combinerInput;
+
+    /** */
+    private volatile GridHadoopTaskContext ctx;
+
+    /** Set if task is to cancelling. */
+    private volatile boolean cancelled;
+
+    /** Node id. */
+    private UUID nodeId;
+
+    /**
+     * @param log Log.
+     * @param job Job.
+     * @param mem Memory.
+     * @param info Task info.
+     * @param nodeId Node id.
+     */
+    protected HadoopRunnableTask(IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem, GridHadoopTaskInfo info,
+        UUID nodeId) {
+        this.nodeId = nodeId;
+        this.log = log.getLogger(HadoopRunnableTask.class);
+        this.job = job;
+        this.mem = mem;
+        this.info = info;
+    }
+
+    /**
+     * @return Wait time.
+     */
+    public long waitTime() {
+        return execStartTs - submitTs;
+    }
+
+    /**
+     * @return Execution time.
+     */
+    public long executionTime() {
+        return execEndTs - execStartTs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Void call() throws IgniteCheckedException {
+        execStartTs = U.currentTimeMillis();
+
+        Throwable err = null;
+
+        HadoopTaskState state = HadoopTaskState.COMPLETED;
+
+        HadoopPerformanceCounter perfCntr = null;
+
+        try {
+            ctx = job.getTaskContext(info);
+
+            perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
+
+            perfCntr.onTaskSubmit(info, submitTs);
+            perfCntr.onTaskPrepare(info, execStartTs);
+
+            ctx.prepareTaskEnvironment();
+
+            runTask(perfCntr);
+
+            if (info.type() == MAP && job.info().hasCombiner()) {
+                ctx.taskInfo(new GridHadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null));
+
+                try {
+                    runTask(perfCntr);
+                }
+                finally {
+                    ctx.taskInfo(info);
+                }
+            }
+        }
+        catch (HadoopTaskCancelledException ignored) {
+            state = HadoopTaskState.CANCELED;
+        }
+        catch (Throwable e) {
+            state = HadoopTaskState.FAILED;
+            err = e;
+
+            U.error(log, "Task execution failed.", e);
+        }
+        finally {
+            execEndTs = U.currentTimeMillis();
+
+            if (perfCntr != null)
+                perfCntr.onTaskFinish(info, execEndTs);
+
+            onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : ctx.counters()));
+
+            if (combinerInput != null)
+                combinerInput.close();
+
+            if (ctx != null)
+                ctx.cleanupTaskEnvironment();
+        }
+
+        return null;
+    }
+
+    /**
+     * @param perfCntr Performance counter.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void runTask(HadoopPerformanceCounter perfCntr) throws IgniteCheckedException {
+        if (cancelled)
+            throw new HadoopTaskCancelledException("Task cancelled.");
+
+        try (GridHadoopTaskOutput out = createOutputInternal(ctx);
+             GridHadoopTaskInput in = createInputInternal(ctx)) {
+
+            ctx.input(in);
+            ctx.output(out);
+
+            perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis());
+
+            ctx.run();
+        }
+    }
+
+    /**
+     * Cancel the executed task.
+     */
+    public void cancel() {
+        cancelled = true;
+
+        if (ctx != null)
+            ctx.cancel();
+    }
+
+    /**
+     * @param status Task status.
+     */
+    protected abstract void onTaskFinished(HadoopTaskStatus status);
+
+    /**
+     * @param ctx Task context.
+     * @return Task input.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private GridHadoopTaskInput createInputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+        switch (ctx.taskInfo().type()) {
+            case SETUP:
+            case MAP:
+            case COMMIT:
+            case ABORT:
+                return null;
+
+            case COMBINE:
+                assert combinerInput != null;
+
+                return combinerInput.input(ctx);
+
+            default:
+                return createInput(ctx);
+        }
+    }
+
+    /**
+     * @param ctx Task context.
+     * @return Input.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected abstract GridHadoopTaskInput createInput(GridHadoopTaskContext ctx) throws IgniteCheckedException;
+
+    /**
+     * @param ctx Task info.
+     * @return Output.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected abstract GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx) throws IgniteCheckedException;
+
+    /**
+     * @param ctx Task info.
+     * @return Task output.
+     * @throws IgniteCheckedException If failed.
+     */
+    private GridHadoopTaskOutput createOutputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+        switch (ctx.taskInfo().type()) {
+            case SETUP:
+            case REDUCE:
+            case COMMIT:
+            case ABORT:
+                return null;
+
+            case MAP:
+                if (job.info().hasCombiner()) {
+                    assert combinerInput == null;
+
+                    combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ?
+                        new HadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)):
+                        new HadoopSkipList(job.info(), mem); // TODO replace with red-black tree
+
+                    return combinerInput.startAdding(ctx);
+                }
+
+            default:
+                return createOutput(ctx);
+        }
+    }
+
+    /**
+     * @return Task info.
+     */
+    public GridHadoopTaskInfo taskInfo() {
+        return info;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
index 10ad648..f05761e 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
@@ -64,7 +64,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
     private String pathSep;
 
     /** Hadoop external communication. */
-    private GridHadoopExternalCommunication comm;
+    private HadoopExternalCommunication comm;
 
     /** Starting processes. */
     private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>();
@@ -90,7 +90,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
 
         initJavaCommand();
 
-        comm = new GridHadoopExternalCommunication(
+        comm = new HadoopExternalCommunication(
             ctx.localNodeId(),
             UUID.randomUUID(),
             ctx.kernalContext().config().getMarshaller(),
@@ -508,7 +508,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
         cmd.addAll(startMeta.jvmOptions());
         cmd.add("-cp");
         cmd.add(buildClasspath(startMeta.classpath()));
-        cmd.add(GridHadoopExternalProcessStarter.class.getName());
+        cmd.add(HadoopExternalProcessStarter.class.getName());
         cmd.add("-cpid");
         cmd.add(String.valueOf(childProcId));
         cmd.add("-ppid");
@@ -635,7 +635,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
     /**
      *
      */
-    private class MessageListener implements GridHadoopMessageListener {
+    private class MessageListener implements HadoopMessageListener {
         /** {@inheritDoc} */
         @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) {
             if (!busyLock.tryReadLock())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java
deleted file mode 100644
index 21552e2..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.processors.hadoop.shuffle.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*;
-
-/**
- * Hadoop process base.
- */
-@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-public class GridHadoopChildProcessRunner {
-    /** Node process descriptor. */
-    private HadoopProcessDescriptor nodeDesc;
-
-    /** Message processing executor service. */
-    private ExecutorService msgExecSvc;
-
-    /** Task executor service. */
-    private HadoopExecutorService execSvc;
-
-    /** */
-    protected GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-    /** External communication. */
-    private GridHadoopExternalCommunication comm;
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Init guard. */
-    private final AtomicBoolean initGuard = new AtomicBoolean();
-
-    /** Start time. */
-    private long startTime;
-
-    /** Init future. */
-    private final GridFutureAdapterEx<?> initFut = new GridFutureAdapterEx<>();
-
-    /** Job instance. */
-    private GridHadoopJob job;
-
-    /** Number of uncompleted tasks. */
-    private final AtomicInteger pendingTasks = new AtomicInteger();
-
-    /** Shuffle job. */
-    private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob;
-
-    /** Concurrent mappers. */
-    private int concMappers;
-
-    /** Concurrent reducers. */
-    private int concReducers;
-
-    /**
-     * Starts child process runner.
-     */
-    public void start(GridHadoopExternalCommunication comm, HadoopProcessDescriptor nodeDesc,
-        ExecutorService msgExecSvc, IgniteLogger parentLog)
-        throws IgniteCheckedException {
-        this.comm = comm;
-        this.nodeDesc = nodeDesc;
-        this.msgExecSvc = msgExecSvc;
-
-        comm.setListener(new MessageListener());
-        log = parentLog.getLogger(GridHadoopChildProcessRunner.class);
-
-        startTime = U.currentTimeMillis();
-
-        // At this point node knows that this process has started.
-        comm.sendMessage(this.nodeDesc, new HadoopProcessStartedAck());
-    }
-
-    /**
-     * Initializes process for task execution.
-     *
-     * @param req Initialization request.
-     */
-    private void prepareProcess(HadoopPrepareForJobRequest req) {
-        if (initGuard.compareAndSet(false, true)) {
-            try {
-                if (log.isDebugEnabled())
-                    log.debug("Initializing external hadoop task: " + req);
-
-                assert job == null;
-
-                job = req.jobInfo().createJob(req.jobId(), log);
-
-                job.initialize(true, nodeDesc.processId());
-
-                shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem,
-                    req.totalReducerCount(), req.localReducers());
-
-                initializeExecutors(req);
-
-                if (log.isDebugEnabled())
-                    log.debug("External process initialized [initWaitTime=" +
-                        (U.currentTimeMillis() - startTime) + ']');
-
-                initFut.onDone(null, null);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to initialize process: " + req, e);
-
-                initFut.onDone(e);
-            }
-        }
-        else
-            log.warning("Duplicate initialize process request received (will ignore): " + req);
-    }
-
-    /**
-     * @param req Task execution request.
-     */
-    private void runTasks(final HadoopTaskExecutionRequest req) {
-        if (!initFut.isDone() && log.isDebugEnabled())
-            log.debug("Will wait for process initialization future completion: " + req);
-
-        initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
-            @Override public void apply(IgniteInternalFuture<?> f) {
-                try {
-                    // Make sure init was successful.
-                    f.get();
-
-                    boolean set = pendingTasks.compareAndSet(0, req.tasks().size());
-
-                    assert set;
-
-                    GridHadoopTaskInfo info = F.first(req.tasks());
-
-                    assert info != null;
-
-                    int size = info.type() == MAP ? concMappers : concReducers;
-
-//                    execSvc.setCorePoolSize(size);
-//                    execSvc.setMaximumPoolSize(size);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Set executor service size for task type [type=" + info.type() +
-                            ", size=" + size + ']');
-
-                    for (GridHadoopTaskInfo taskInfo : req.tasks()) {
-                        if (log.isDebugEnabled())
-                            log.debug("Submitted task for external execution: " + taskInfo);
-
-                        execSvc.submit(new GridHadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) {
-                            @Override protected void onTaskFinished(HadoopTaskStatus status) {
-                                onTaskFinished0(this, status);
-                            }
-
-                            @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext ctx)
-                                throws IgniteCheckedException {
-                                return shuffleJob.input(ctx);
-                            }
-
-                            @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx)
-                                throws IgniteCheckedException {
-                                return shuffleJob.output(ctx);
-                            }
-                        });
-                    }
-                }
-                catch (IgniteCheckedException e) {
-                    for (GridHadoopTaskInfo info : req.tasks())
-                        notifyTaskFinished(info, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
-                }
-            }
-        });
-    }
-
-    /**
-     * Creates executor services.
-     *
-     * @param req Init child process request.
-     */
-    private void initializeExecutors(HadoopPrepareForJobRequest req) {
-        int cpus = Runtime.getRuntime().availableProcessors();
-//
-//        concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus);
-//        concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus);
-
-        execSvc = new HadoopExecutorService(log, "", cpus * 2, 1024);
-    }
-
-    /**
-     * Updates external process map so that shuffle can proceed with sending messages to reducers.
-     *
-     * @param req Update request.
-     */
-    private void updateTasks(final HadoopJobInfoUpdateRequest req) {
-        initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
-            @Override public void apply(IgniteInternalFuture<?> gridFut) {
-                assert initGuard.get();
-
-                assert req.jobId().equals(job.id());
-
-                if (req.reducersAddresses() != null) {
-                    if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) {
-                        shuffleJob.startSending("external",
-                            new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() {
-                                @Override public void applyx(HadoopProcessDescriptor dest,
-                                    HadoopShuffleMessage msg) throws IgniteCheckedException {
-                                    comm.sendMessage(dest, msg);
-                                }
-                            });
-                    }
-                }
-            }
-        });
-    }
-
-    /**
-     * Stops all executors and running tasks.
-     */
-    private void shutdown() {
-        if (execSvc != null)
-            execSvc.shutdown(5000);
-
-        if (msgExecSvc != null)
-            msgExecSvc.shutdownNow();
-
-        try {
-            job.dispose(true);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to dispose job.", e);
-        }
-    }
-
-    /**
-     * Notifies node about task finish.
-     *
-     * @param run Finished task runnable.
-     * @param status Task status.
-     */
-    private void onTaskFinished0(GridHadoopRunnableTask run, HadoopTaskStatus status) {
-        GridHadoopTaskInfo info = run.taskInfo();
-
-        int pendingTasks0 = pendingTasks.decrementAndGet();
-
-        if (log.isDebugEnabled())
-            log.debug("Hadoop task execution finished [info=" + info
-                + ", state=" + status.state() + ", waitTime=" + run.waitTime() + ", execTime=" + run.executionTime() +
-                ", pendingTasks=" + pendingTasks0 +
-                ", err=" + status.failCause() + ']');
-
-        assert info.type() == MAP || info.type() == REDUCE : "Only MAP or REDUCE tasks are supported.";
-
-        boolean flush = pendingTasks0 == 0 && info.type() == MAP;
-
-        notifyTaskFinished(info, status, flush);
-    }
-
-    /**
-     * @param taskInfo Finished task info.
-     * @param status Task status.
-     */
-    private void notifyTaskFinished(final GridHadoopTaskInfo taskInfo, final HadoopTaskStatus status,
-        boolean flush) {
-
-        final HadoopTaskState state = status.state();
-        final Throwable err = status.failCause();
-
-        if (!flush) {
-            try {
-                if (log.isDebugEnabled())
-                    log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state +
-                        ", err=" + err + ']');
-
-                comm.sendMessage(nodeDesc, new HadoopTaskFinishedMessage(taskInfo, status));
-            }
-            catch (IgniteCheckedException e) {
-                log.error("Failed to send message to parent node (will terminate child process).", e);
-
-                shutdown();
-
-                terminate();
-            }
-        }
-        else {
-            if (log.isDebugEnabled())
-                log.debug("Flushing shuffle messages before sending last task completion notification [taskInfo=" +
-                    taskInfo + ", state=" + state + ", err=" + err + ']');
-
-            final long start = U.currentTimeMillis();
-
-            try {
-                shuffleJob.flush().listenAsync(new CI1<IgniteInternalFuture<?>>() {
-                    @Override public void apply(IgniteInternalFuture<?> f) {
-                        long end = U.currentTimeMillis();
-
-                        if (log.isDebugEnabled())
-                            log.debug("Finished flushing shuffle messages [taskInfo=" + taskInfo +
-                                ", flushTime=" + (end - start) + ']');
-
-                        try {
-                            // Check for errors on shuffle.
-                            f.get();
-
-                            notifyTaskFinished(taskInfo, status, false);
-                        }
-                        catch (IgniteCheckedException e) {
-                            log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
-                                ", state=" + state + ", err=" + err + ']', e);
-
-                            notifyTaskFinished(taskInfo,
-                                new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
-                        }
-                    }
-                });
-            }
-            catch (IgniteCheckedException e) {
-                log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
-                    ", state=" + state + ", err=" + err + ']', e);
-
-                notifyTaskFinished(taskInfo, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
-            }
-        }
-    }
-
-    /**
-     * Checks if message was received from parent node and prints warning if not.
-     *
-     * @param desc Sender process ID.
-     * @param msg Received message.
-     * @return {@code True} if received from parent node.
-     */
-    private boolean validateNodeMessage(HadoopProcessDescriptor desc, HadoopMessage msg) {
-        if (!nodeDesc.processId().equals(desc.processId())) {
-            log.warning("Received process control request from unknown process (will ignore) [desc=" + desc +
-                ", msg=" + msg + ']');
-
-            return false;
-        }
-
-        return true;
-    }
-
-    /**
-     * Stops execution of this process.
-     */
-    private void terminate() {
-        System.exit(1);
-    }
-
-    /**
-     * Message listener.
-     */
-    private class MessageListener implements GridHadoopMessageListener {
-        /** {@inheritDoc} */
-        @Override public void onMessageReceived(final HadoopProcessDescriptor desc, final HadoopMessage msg) {
-            if (msg instanceof HadoopTaskExecutionRequest) {
-                if (validateNodeMessage(desc, msg))
-                    runTasks((HadoopTaskExecutionRequest)msg);
-            }
-            else if (msg instanceof HadoopJobInfoUpdateRequest) {
-                if (validateNodeMessage(desc, msg))
-                    updateTasks((HadoopJobInfoUpdateRequest)msg);
-            }
-            else if (msg instanceof HadoopPrepareForJobRequest) {
-                if (validateNodeMessage(desc, msg))
-                    prepareProcess((HadoopPrepareForJobRequest)msg);
-            }
-            else if (msg instanceof HadoopShuffleMessage) {
-                if (log.isTraceEnabled())
-                    log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']');
-
-                initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
-                    @Override public void apply(IgniteInternalFuture<?> f) {
-                        try {
-                            HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
-
-                            shuffleJob.onShuffleMessage(m);
-
-                            comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId()));
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e);
-                        }
-                    }
-                });
-            }
-            else if (msg instanceof HadoopShuffleAck) {
-                if (log.isTraceEnabled())
-                    log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']');
-
-                shuffleJob.onShuffleAck((HadoopShuffleAck)msg);
-            }
-            else
-                log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']');
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
-            if (log.isDebugEnabled())
-                log.debug("Lost connection with remote process: " + desc);
-
-            if (desc == null)
-                U.warn(log, "Handshake failed.");
-            else if (desc.processId().equals(nodeDesc.processId())) {
-                log.warning("Child process lost connection with parent node (will terminate child process).");
-
-                shutdown();
-
-                terminate();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java
deleted file mode 100644
index 1216c9a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.logger.log4j.*;
-import org.apache.ignite.marshaller.optimized.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Hadoop external process base class.
- */
-public class GridHadoopExternalProcessStarter {
-    /** Path to Log4j configuration file. */
-    public static final String DFLT_LOG4J_CONFIG = "config/ignite-log4j.xml";
-
-    /** Arguments. */
-    private Args args;
-
-    /** System out. */
-    private OutputStream out;
-
-    /** System err. */
-    private OutputStream err;
-
-    /**
-     * @param args Parsed arguments.
-     */
-    public GridHadoopExternalProcessStarter(Args args) {
-        this.args = args;
-    }
-
-    /**
-     * @param cmdArgs Process arguments.
-     */
-    public static void main(String[] cmdArgs) {
-        try {
-            Args args = arguments(cmdArgs);
-
-            new GridHadoopExternalProcessStarter(args).run();
-        }
-        catch (Exception e) {
-            System.err.println("Failed");
-
-            System.err.println(e.getMessage());
-
-            e.printStackTrace(System.err);
-        }
-    }
-
-    /**
-     *
-     * @throws Exception
-     */
-    public void run() throws Exception {
-        U.setWorkDirectory(args.workDir, U.getIgniteHome());
-
-        File outputDir = outputDirectory();
-
-        initializeStreams(outputDir);
-
-        ExecutorService msgExecSvc = Executors.newFixedThreadPool(
-            Integer.getInteger("MSG_THREAD_POOL_SIZE", Runtime.getRuntime().availableProcessors() * 2));
-
-        IgniteLogger log = logger(outputDir);
-
-        GridHadoopExternalCommunication comm = new GridHadoopExternalCommunication(
-            args.nodeId,
-            args.childProcId,
-            new OptimizedMarshaller(),
-            log,
-            msgExecSvc,
-            "external"
-        );
-
-        comm.start();
-
-        HadoopProcessDescriptor nodeDesc = new HadoopProcessDescriptor(args.nodeId, args.parentProcId);
-        nodeDesc.address(args.addr);
-        nodeDesc.tcpPort(args.tcpPort);
-        nodeDesc.sharedMemoryPort(args.shmemPort);
-
-        GridHadoopChildProcessRunner runner = new GridHadoopChildProcessRunner();
-
-        runner.start(comm, nodeDesc, msgExecSvc, log);
-
-        System.err.println("Started");
-        System.err.flush();
-
-        System.setOut(new PrintStream(out));
-        System.setErr(new PrintStream(err));
-    }
-
-    /**
-     * @param outputDir Directory for process output.
-     * @throws Exception
-     */
-    private void initializeStreams(File outputDir) throws Exception {
-        out = new FileOutputStream(new File(outputDir, args.childProcId + ".out"));
-        err = new FileOutputStream(new File(outputDir, args.childProcId + ".err"));
-    }
-
-    /**
-     * @return Path to output directory.
-     * @throws IOException If failed.
-     */
-    private File outputDirectory() throws IOException {
-        File f = new File(args.out);
-
-        if (!f.exists()) {
-            if (!f.mkdirs())
-                throw new IOException("Failed to create output directory: " + args.out);
-        }
-        else {
-            if (f.isFile())
-                throw new IOException("Output directory is a file: " + args.out);
-        }
-
-        return f;
-    }
-
-    /**
-     * @param outputDir Directory for process output.
-     * @return Logger.
-     */
-    private IgniteLogger logger(final File outputDir) {
-        final URL url = U.resolveIgniteUrl(DFLT_LOG4J_CONFIG);
-
-        Log4JLogger logger;
-
-        try {
-            logger = url != null ? new Log4JLogger(url) : new Log4JLogger(true);
-        }
-        catch (IgniteCheckedException e) {
-            System.err.println("Failed to create URL-based logger. Will use default one.");
-
-            e.printStackTrace();
-
-            logger = new Log4JLogger(true);
-        }
-
-        logger.updateFilePath(new IgniteClosure<String, String>() {
-            @Override public String apply(String s) {
-                return new File(outputDir, args.childProcId + ".log").getAbsolutePath();
-            }
-        });
-
-        return logger;
-    }
-
-    /**
-     * @param processArgs Process arguments.
-     * @return Child process instance.
-     */
-    private static Args arguments(String[] processArgs) throws Exception {
-        Args args = new Args();
-
-        for (int i = 0; i < processArgs.length; i++) {
-            String arg = processArgs[i];
-
-            switch (arg) {
-                case "-cpid": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing process ID for '-cpid' parameter");
-
-                    String procIdStr = processArgs[++i];
-
-                    args.childProcId = UUID.fromString(procIdStr);
-
-                    break;
-                }
-
-                case "-ppid": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing process ID for '-ppid' parameter");
-
-                    String procIdStr = processArgs[++i];
-
-                    args.parentProcId = UUID.fromString(procIdStr);
-
-                    break;
-                }
-
-                case "-nid": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing node ID for '-nid' parameter");
-
-                    String nodeIdStr = processArgs[++i];
-
-                    args.nodeId = UUID.fromString(nodeIdStr);
-
-                    break;
-                }
-
-                case "-addr": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing node address for '-addr' parameter");
-
-                    args.addr = processArgs[++i];
-
-                    break;
-                }
-
-                case "-tport": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing tcp port for '-tport' parameter");
-
-                    args.tcpPort = Integer.parseInt(processArgs[++i]);
-
-                    break;
-                }
-
-                case "-sport": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing shared memory port for '-sport' parameter");
-
-                    args.shmemPort = Integer.parseInt(processArgs[++i]);
-
-                    break;
-                }
-
-                case "-out": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing output folder name for '-out' parameter");
-
-                    args.out = processArgs[++i];
-
-                    break;
-                }
-
-                case "-wd": {
-                    if (i == processArgs.length - 1)
-                        throw new Exception("Missing work folder name for '-wd' parameter");
-
-                    args.workDir = processArgs[++i];
-
-                    break;
-                }
-            }
-        }
-
-        return args;
-    }
-
-    /**
-     * Execution arguments.
-     */
-    private static class Args {
-        /** Process ID. */
-        private UUID childProcId;
-
-        /** Process ID. */
-        private UUID parentProcId;
-
-        /** Process ID. */
-        private UUID nodeId;
-
-        /** Node address. */
-        private String addr;
-
-        /** TCP port */
-        private int tcpPort;
-
-        /** Shmem port. */
-        private int shmemPort = -1;
-
-        /** Output folder. */
-        private String out;
-
-        /** Work directory. */
-        private String workDir;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
new file mode 100644
index 0000000..6345704
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.processors.hadoop.shuffle.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*;
+
+/**
+ * Hadoop process base.
+ */
+@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+public class HadoopChildProcessRunner {
+    /** Node process descriptor. */
+    private HadoopProcessDescriptor nodeDesc;
+
+    /** Message processing executor service. */
+    private ExecutorService msgExecSvc;
+
+    /** Task executor service. */
+    private HadoopExecutorService execSvc;
+
+    /** */
+    protected GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+    /** External communication. */
+    private HadoopExternalCommunication comm;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Init guard. */
+    private final AtomicBoolean initGuard = new AtomicBoolean();
+
+    /** Start time. */
+    private long startTime;
+
+    /** Init future. */
+    private final GridFutureAdapterEx<?> initFut = new GridFutureAdapterEx<>();
+
+    /** Job instance. */
+    private GridHadoopJob job;
+
+    /** Number of uncompleted tasks. */
+    private final AtomicInteger pendingTasks = new AtomicInteger();
+
+    /** Shuffle job. */
+    private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob;
+
+    /** Concurrent mappers. */
+    private int concMappers;
+
+    /** Concurrent reducers. */
+    private int concReducers;
+
+    /**
+     * Starts child process runner.
+     */
+    public void start(HadoopExternalCommunication comm, HadoopProcessDescriptor nodeDesc,
+        ExecutorService msgExecSvc, IgniteLogger parentLog)
+        throws IgniteCheckedException {
+        this.comm = comm;
+        this.nodeDesc = nodeDesc;
+        this.msgExecSvc = msgExecSvc;
+
+        comm.setListener(new MessageListener());
+        log = parentLog.getLogger(HadoopChildProcessRunner.class);
+
+        startTime = U.currentTimeMillis();
+
+        // At this point node knows that this process has started.
+        comm.sendMessage(this.nodeDesc, new HadoopProcessStartedAck());
+    }
+
+    /**
+     * Initializes process for task execution.
+     *
+     * @param req Initialization request.
+     */
+    private void prepareProcess(HadoopPrepareForJobRequest req) {
+        if (initGuard.compareAndSet(false, true)) {
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Initializing external hadoop task: " + req);
+
+                assert job == null;
+
+                job = req.jobInfo().createJob(req.jobId(), log);
+
+                job.initialize(true, nodeDesc.processId());
+
+                shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem,
+                    req.totalReducerCount(), req.localReducers());
+
+                initializeExecutors(req);
+
+                if (log.isDebugEnabled())
+                    log.debug("External process initialized [initWaitTime=" +
+                        (U.currentTimeMillis() - startTime) + ']');
+
+                initFut.onDone(null, null);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to initialize process: " + req, e);
+
+                initFut.onDone(e);
+            }
+        }
+        else
+            log.warning("Duplicate initialize process request received (will ignore): " + req);
+    }
+
+    /**
+     * @param req Task execution request.
+     */
+    private void runTasks(final HadoopTaskExecutionRequest req) {
+        if (!initFut.isDone() && log.isDebugEnabled())
+            log.debug("Will wait for process initialization future completion: " + req);
+
+        initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
+            @Override public void apply(IgniteInternalFuture<?> f) {
+                try {
+                    // Make sure init was successful.
+                    f.get();
+
+                    boolean set = pendingTasks.compareAndSet(0, req.tasks().size());
+
+                    assert set;
+
+                    GridHadoopTaskInfo info = F.first(req.tasks());
+
+                    assert info != null;
+
+                    int size = info.type() == MAP ? concMappers : concReducers;
+
+//                    execSvc.setCorePoolSize(size);
+//                    execSvc.setMaximumPoolSize(size);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Set executor service size for task type [type=" + info.type() +
+                            ", size=" + size + ']');
+
+                    for (GridHadoopTaskInfo taskInfo : req.tasks()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Submitted task for external execution: " + taskInfo);
+
+                        execSvc.submit(new HadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) {
+                            @Override protected void onTaskFinished(HadoopTaskStatus status) {
+                                onTaskFinished0(this, status);
+                            }
+
+                            @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext ctx)
+                                throws IgniteCheckedException {
+                                return shuffleJob.input(ctx);
+                            }
+
+                            @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx)
+                                throws IgniteCheckedException {
+                                return shuffleJob.output(ctx);
+                            }
+                        });
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    for (GridHadoopTaskInfo info : req.tasks())
+                        notifyTaskFinished(info, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
+                }
+            }
+        });
+    }
+
+    /**
+     * Creates executor services.
+     *
+     * @param req Init child process request.
+     */
+    private void initializeExecutors(HadoopPrepareForJobRequest req) {
+        int cpus = Runtime.getRuntime().availableProcessors();
+//
+//        concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus);
+//        concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus);
+
+        execSvc = new HadoopExecutorService(log, "", cpus * 2, 1024);
+    }
+
+    /**
+     * Updates external process map so that shuffle can proceed with sending messages to reducers.
+     *
+     * @param req Update request.
+     */
+    private void updateTasks(final HadoopJobInfoUpdateRequest req) {
+        initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
+            @Override public void apply(IgniteInternalFuture<?> gridFut) {
+                assert initGuard.get();
+
+                assert req.jobId().equals(job.id());
+
+                if (req.reducersAddresses() != null) {
+                    if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) {
+                        shuffleJob.startSending("external",
+                            new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() {
+                                @Override public void applyx(HadoopProcessDescriptor dest,
+                                    HadoopShuffleMessage msg) throws IgniteCheckedException {
+                                    comm.sendMessage(dest, msg);
+                                }
+                            });
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * Stops all executors and running tasks.
+     */
+    private void shutdown() {
+        if (execSvc != null)
+            execSvc.shutdown(5000);
+
+        if (msgExecSvc != null)
+            msgExecSvc.shutdownNow();
+
+        try {
+            job.dispose(true);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to dispose job.", e);
+        }
+    }
+
+    /**
+     * Notifies node about task finish.
+     *
+     * @param run Finished task runnable.
+     * @param status Task status.
+     */
+    private void onTaskFinished0(HadoopRunnableTask run, HadoopTaskStatus status) {
+        GridHadoopTaskInfo info = run.taskInfo();
+
+        int pendingTasks0 = pendingTasks.decrementAndGet();
+
+        if (log.isDebugEnabled())
+            log.debug("Hadoop task execution finished [info=" + info
+                + ", state=" + status.state() + ", waitTime=" + run.waitTime() + ", execTime=" + run.executionTime() +
+                ", pendingTasks=" + pendingTasks0 +
+                ", err=" + status.failCause() + ']');
+
+        assert info.type() == MAP || info.type() == REDUCE : "Only MAP or REDUCE tasks are supported.";
+
+        boolean flush = pendingTasks0 == 0 && info.type() == MAP;
+
+        notifyTaskFinished(info, status, flush);
+    }
+
+    /**
+     * @param taskInfo Finished task info.
+     * @param status Task status.
+     */
+    private void notifyTaskFinished(final GridHadoopTaskInfo taskInfo, final HadoopTaskStatus status,
+        boolean flush) {
+
+        final HadoopTaskState state = status.state();
+        final Throwable err = status.failCause();
+
+        if (!flush) {
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state +
+                        ", err=" + err + ']');
+
+                comm.sendMessage(nodeDesc, new HadoopTaskFinishedMessage(taskInfo, status));
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to send message to parent node (will terminate child process).", e);
+
+                shutdown();
+
+                terminate();
+            }
+        }
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Flushing shuffle messages before sending last task completion notification [taskInfo=" +
+                    taskInfo + ", state=" + state + ", err=" + err + ']');
+
+            final long start = U.currentTimeMillis();
+
+            try {
+                shuffleJob.flush().listenAsync(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> f) {
+                        long end = U.currentTimeMillis();
+
+                        if (log.isDebugEnabled())
+                            log.debug("Finished flushing shuffle messages [taskInfo=" + taskInfo +
+                                ", flushTime=" + (end - start) + ']');
+
+                        try {
+                            // Check for errors on shuffle.
+                            f.get();
+
+                            notifyTaskFinished(taskInfo, status, false);
+                        }
+                        catch (IgniteCheckedException e) {
+                            log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
+                                ", state=" + state + ", err=" + err + ']', e);
+
+                            notifyTaskFinished(taskInfo,
+                                new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
+                        }
+                    }
+                });
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
+                    ", state=" + state + ", err=" + err + ']', e);
+
+                notifyTaskFinished(taskInfo, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
+            }
+        }
+    }
+
+    /**
+     * Checks if message was received from parent node and prints warning if not.
+     *
+     * @param desc Sender process ID.
+     * @param msg Received message.
+     * @return {@code True} if received from parent node.
+     */
+    private boolean validateNodeMessage(HadoopProcessDescriptor desc, HadoopMessage msg) {
+        if (!nodeDesc.processId().equals(desc.processId())) {
+            log.warning("Received process control request from unknown process (will ignore) [desc=" + desc +
+                ", msg=" + msg + ']');
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Stops execution of this process.
+     */
+    private void terminate() {
+        System.exit(1);
+    }
+
+    /**
+     * Message listener.
+     */
+    private class MessageListener implements HadoopMessageListener {
+        /** {@inheritDoc} */
+        @Override public void onMessageReceived(final HadoopProcessDescriptor desc, final HadoopMessage msg) {
+            if (msg instanceof HadoopTaskExecutionRequest) {
+                if (validateNodeMessage(desc, msg))
+                    runTasks((HadoopTaskExecutionRequest)msg);
+            }
+            else if (msg instanceof HadoopJobInfoUpdateRequest) {
+                if (validateNodeMessage(desc, msg))
+                    updateTasks((HadoopJobInfoUpdateRequest)msg);
+            }
+            else if (msg instanceof HadoopPrepareForJobRequest) {
+                if (validateNodeMessage(desc, msg))
+                    prepareProcess((HadoopPrepareForJobRequest)msg);
+            }
+            else if (msg instanceof HadoopShuffleMessage) {
+                if (log.isTraceEnabled())
+                    log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']');
+
+                initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> f) {
+                        try {
+                            HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
+
+                            shuffleJob.onShuffleMessage(m);
+
+                            comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId()));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e);
+                        }
+                    }
+                });
+            }
+            else if (msg instanceof HadoopShuffleAck) {
+                if (log.isTraceEnabled())
+                    log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']');
+
+                shuffleJob.onShuffleAck((HadoopShuffleAck)msg);
+            }
+            else
+                log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']');
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
+            if (log.isDebugEnabled())
+                log.debug("Lost connection with remote process: " + desc);
+
+            if (desc == null)
+                U.warn(log, "Handshake failed.");
+            else if (desc.processId().equals(nodeDesc.processId())) {
+                log.warning("Child process lost connection with parent node (will terminate child process).");
+
+                shutdown();
+
+                terminate();
+            }
+        }
+    }
+}


Mime
View raw message