ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [27/93] [abbrv] incubator-ignite git commit: IGNITE-386: Squashed changes.
Date Fri, 06 Mar 2015 19:22:58 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
new file mode 100644
index 0000000..39f42b2
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -0,0 +1,1626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.jobtracker;
+
+import org.apache.ignite.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.event.*;
+import javax.cache.expiry.*;
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.*;
+import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*;
+import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.*;
+
+/**
+ * Hadoop job tracker.
+ */
+public class HadoopJobTracker extends HadoopComponent {
+    /** */
+    private final GridMutex mux = new GridMutex();
+
+    /** */
+    private volatile GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> jobMetaPrj;
+
+    /** Projection with expiry policy for finished job updates. */
+    private volatile GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> finishedJobMetaPrj;
+
+    /** Map-reduce execution planner. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private HadoopMapReducePlanner mrPlanner;
+
+    /** All the known jobs. */
+    private final ConcurrentMap<HadoopJobId, GridFutureAdapterEx<HadoopJob>> jobs = new ConcurrentHashMap8<>();
+
+    /** Locally active jobs. */
+    private final ConcurrentMap<HadoopJobId, JobLocalState> activeJobs = new ConcurrentHashMap8<>();
+
+    /** Locally requested finish futures. */
+    private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJobId>> activeFinishFuts =
+        new ConcurrentHashMap8<>();
+
+    /** Event processing service. */
+    private ExecutorService evtProcSvc;
+
+    /** Component busy lock. */
+    private GridSpinReadWriteLock busyLock;
+
+    /** Closure to check result of async transform of system cache. */
+    private final IgniteInClosure<IgniteInternalFuture<?>> failsLog = new CI1<IgniteInternalFuture<?>>() {
+        @Override public void apply(IgniteInternalFuture<?> gridFut) {
+            try {
+                gridFut.get();
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to transform system cache.", e);
+            }
+        }
+    };
+
+    /** {@inheritDoc} */
+    @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
+        super.start(ctx);
+
+        busyLock = new GridSpinReadWriteLock();
+
+        evtProcSvc = Executors.newFixedThreadPool(1);
+    }
+
+    /**
+     * @return Job meta projection.
+     */
+    @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+    private GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> jobMetaCache() {
+        GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> prj = jobMetaPrj;
+
+        if (prj == null) {
+            synchronized (mux) {
+                if ((prj = jobMetaPrj) == null) {
+                    CacheProjection<Object, Object> sysCache = ctx.kernalContext().cache()
+                        .cache(CU.SYS_CACHE_HADOOP_MR);
+
+                    assert sysCache != null;
+
+                    mrPlanner = ctx.planner();
+
+                    try {
+                        ctx.kernalContext().resource().injectGeneric(mrPlanner);
+                    }
+                    catch (IgniteCheckedException e) { // Must not happen.
+                        U.error(log, "Failed to inject resources.", e);
+
+                        throw new IllegalStateException(e);
+                    }
+
+                    jobMetaPrj = prj = (GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata>)
+                        sysCache.projection(HadoopJobId.class, HadoopJobMetadata.class);
+
+                    if (ctx.configuration().getFinishedJobInfoTtl() > 0) {
+                        ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy(
+                            new Duration(MILLISECONDS, ctx.configuration().getFinishedJobInfoTtl()));
+
+                        finishedJobMetaPrj = prj.withExpiryPolicy(finishedJobPlc);
+                    }
+                    else
+                        finishedJobMetaPrj = jobMetaPrj;
+                }
+            }
+        }
+
+        return prj;
+    }
+
+    /**
+     * @return Projection with expiry policy for finished job updates.
+     */
+    private GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> finishedJobMetaCache() {
+        GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> prj = finishedJobMetaPrj;
+
+        if (prj == null) {
+            jobMetaCache();
+
+            prj = finishedJobMetaPrj;
+
+            assert prj != null;
+        }
+
+        return prj;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+
+        jobMetaCache().context().continuousQueries().executeInternalQuery(
+            new CacheEntryUpdatedListener<HadoopJobId, HadoopJobMetadata>() {
+                @Override public void onUpdated(final Iterable<CacheEntryEvent<? extends HadoopJobId,
+                    ? extends HadoopJobMetadata>> evts) {
+                    if (!busyLock.tryReadLock())
+                        return;
+
+                    try {
+                        // Must process query callback in a separate thread to avoid deadlocks.
+                        evtProcSvc.submit(new EventHandler() {
+                            @Override protected void body() throws IgniteCheckedException {
+                                processJobMetadataUpdates(evts);
+                            }
+                        });
+                    }
+                    finally {
+                        busyLock.readUnlock();
+                    }
+                }
+            },
+            null,
+            true,
+            true
+        );
+
+        ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
+            @Override public void onEvent(final Event evt) {
+                if (!busyLock.tryReadLock())
+                    return;
+
+                try {
+                    // Must process discovery callback in a separate thread to avoid deadlock.
+                    evtProcSvc.submit(new EventHandler() {
+                        @Override protected void body() {
+                            processNodeLeft((DiscoveryEvent)evt);
+                        }
+                    });
+                }
+                finally {
+                    busyLock.readUnlock();
+                }
+            }
+        }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        super.onKernalStop(cancel);
+
+        busyLock.writeLock();
+
+        evtProcSvc.shutdown();
+
+        // Fail all pending futures.
+        for (GridFutureAdapter<HadoopJobId> fut : activeFinishFuts.values())
+            fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job (grid is stopping)."));
+    }
+
+    /**
+     * Submits execution of Hadoop job to grid.
+     *
+     * @param jobId Job ID.
+     * @param info Job info.
+     * @return Job completion future.
+     */
+    @SuppressWarnings("unchecked")
+    public IgniteInternalFuture<HadoopJobId> submit(HadoopJobId jobId, HadoopJobInfo info) {
+        if (!busyLock.tryReadLock()) {
+            return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to execute map-reduce job " +
+                "(grid is stopping): " + info));
+        }
+
+        try {
+            long jobPrepare = U.currentTimeMillis();
+
+            if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId))
+                throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId);
+
+            HadoopJob job = job(jobId, info);
+
+            HadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null);
+
+            HadoopJobMetadata meta = new HadoopJobMetadata(ctx.localNodeId(), jobId, info);
+
+            meta.mapReducePlan(mrPlan);
+
+            meta.pendingSplits(allSplits(mrPlan));
+            meta.pendingReducers(allReducers(mrPlan));
+
+            GridFutureAdapter<HadoopJobId> completeFut = new GridFutureAdapter<>();
+
+            GridFutureAdapter<HadoopJobId> old = activeFinishFuts.put(jobId, completeFut);
+
+            assert old == null : "Duplicate completion future [jobId=" + jobId + ", old=" + old + ']';
+
+            if (log.isDebugEnabled())
+                log.debug("Submitting job metadata [jobId=" + jobId + ", meta=" + meta + ']');
+
+            long jobStart = U.currentTimeMillis();
+
+            HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(meta.counters(),
+                ctx.localNodeId());
+
+            perfCntr.clientSubmissionEvents(info);
+            perfCntr.onJobPrepare(jobPrepare);
+            perfCntr.onJobStart(jobStart);
+
+            if (jobMetaCache().putIfAbsent(jobId, meta) != null)
+                throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId);
+
+            return completeFut;
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to submit job: " + jobId, e);
+
+            return new GridFinishedFutureEx<>(e);
+        }
+        finally {
+            busyLock.readUnlock();
+        }
+    }
+
+    /**
+     * Convert Hadoop job metadata to job status.
+     *
+     * @param meta Metadata.
+     * @return Status.
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    public static HadoopJobStatus status(HadoopJobMetadata meta) {
+        HadoopJobInfo jobInfo = meta.jobInfo();
+
+        return new HadoopJobStatus(
+            meta.jobId(),
+            jobInfo.jobName(),
+            jobInfo.user(),
+            meta.pendingSplits() != null ? meta.pendingSplits().size() : 0,
+            meta.pendingReducers() != null ? meta.pendingReducers().size() : 0,
+            meta.mapReducePlan().mappers(),
+            meta.mapReducePlan().reducers(),
+            meta.phase(),
+            meta.failCause() != null,
+            meta.version()
+        );
+    }
+
+    /**
+     * Gets hadoop job status for given job ID.
+     *
+     * @param jobId Job ID to get status for.
+     * @return Job status for given job ID or {@code null} if job was not found.
+     */
+    @Nullable public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
+        if (!busyLock.tryReadLock())
+            return null; // Grid is stopping.
+
+        try {
+            HadoopJobMetadata meta = jobMetaCache().get(jobId);
+
+            return meta != null ? status(meta) : null;
+        }
+        finally {
+            busyLock.readUnlock();
+        }
+    }
+
+    /**
+     * Gets job finish future.
+     *
+     * @param jobId Job ID.
+     * @return Finish future or {@code null}.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
+        if (!busyLock.tryReadLock())
+            return null; // Grid is stopping.
+
+        try {
+            HadoopJobMetadata meta = jobMetaCache().get(jobId);
+
+            if (meta == null)
+                return null;
+
+            if (log.isTraceEnabled())
+                log.trace("Got job metadata for status check [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']');
+
+            if (meta.phase() == PHASE_COMPLETE) {
+                if (log.isTraceEnabled())
+                    log.trace("Job is complete, returning finished future: " + jobId);
+
+                return new GridFinishedFutureEx<>(jobId, meta.failCause());
+            }
+
+            GridFutureAdapter<HadoopJobId> fut = F.addIfAbsent(activeFinishFuts, jobId,
+                new GridFutureAdapter<HadoopJobId>());
+
+            // Get meta from cache one more time to close the window.
+            meta = jobMetaCache().get(jobId);
+
+            if (log.isTraceEnabled())
+                log.trace("Re-checking job metadata [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']');
+
+            if (meta == null) {
+                fut.onDone();
+
+                activeFinishFuts.remove(jobId , fut);
+            }
+            else if (meta.phase() == PHASE_COMPLETE) {
+                fut.onDone(jobId, meta.failCause());
+
+                activeFinishFuts.remove(jobId , fut);
+            }
+
+            return fut;
+        }
+        finally {
+            busyLock.readUnlock();
+        }
+    }
+
+    /**
+     * Gets job plan by job ID.
+     *
+     * @param jobId Job ID.
+     * @return Job plan.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopMapReducePlan plan(HadoopJobId jobId) throws IgniteCheckedException {
+        if (!busyLock.tryReadLock())
+            return null;
+
+        try {
+            HadoopJobMetadata meta = jobMetaCache().get(jobId);
+
+            if (meta != null)
+                return meta.mapReducePlan();
+
+            return null;
+        }
+        finally {
+            busyLock.readUnlock();
+        }
+    }
+
+    /**
+     * Callback from task executor invoked when a task has been finished.
+     *
+     * @param info Task info.
+     * @param status Task status.
+     */
+    @SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
+    public void onTaskFinished(HadoopTaskInfo info, HadoopTaskStatus status) {
+        if (!busyLock.tryReadLock())
+            return;
+
+        try {
+            assert status.state() != RUNNING;
+
+            if (log.isDebugEnabled())
+                log.debug("Received task finished callback [info=" + info + ", status=" + status + ']');
+
+            JobLocalState state = activeJobs.get(info.jobId());
+
+            // Task CRASHes with null fail cause.
+            assert (status.state() != FAILED) || status.failCause() != null :
+                "Invalid task status [info=" + info + ", status=" + status + ']';
+
+            assert state != null || (ctx.jobUpdateLeader() && (info.type() == COMMIT || info.type() == ABORT)):
+                "Missing local state for finished task [info=" + info + ", status=" + status + ']';
+
+            StackedProcessor incrCntrs = null;
+
+            if (status.state() == COMPLETED)
+                incrCntrs = new IncrementCountersProcessor(null, status.counters());
+
+            switch (info.type()) {
+                case SETUP: {
+                    state.onSetupFinished(info, status, incrCntrs);
+
+                    break;
+                }
+
+                case MAP: {
+                    state.onMapFinished(info, status, incrCntrs);
+
+                    break;
+                }
+
+                case REDUCE: {
+                    state.onReduceFinished(info, status, incrCntrs);
+
+                    break;
+                }
+
+                case COMBINE: {
+                    state.onCombineFinished(info, status, incrCntrs);
+
+                    break;
+                }
+
+                case COMMIT:
+                case ABORT: {
+                    GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> cache = finishedJobMetaCache();
+
+                    cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)).
+                        listenAsync(failsLog);
+
+                    break;
+                }
+            }
+        }
+        finally {
+            busyLock.readUnlock();
+        }
+    }
+
+    /**
+     * @param jobId Job id.
+     * @param c Closure of operation.
+     */
+    private void transform(HadoopJobId jobId, EntryProcessor<HadoopJobId, HadoopJobMetadata, Void> c) {
+        jobMetaCache().invokeAsync(jobId, c).listenAsync(failsLog);
+    }
+
+    /**
+     * Callback from task executor called when process is ready to received shuffle messages.
+     *
+     * @param jobId Job ID.
+     * @param reducers Reducers.
+     * @param desc Process descriptor.
+     */
+    public void onExternalMappersInitialized(HadoopJobId jobId, Collection<Integer> reducers,
+        HadoopProcessDescriptor desc) {
+        transform(jobId, new InitializeReducersProcessor(null, reducers, desc));
+    }
+
+    /**
+     * Gets all input splits for given hadoop map-reduce plan.
+     *
+     * @param plan Map-reduce plan.
+     * @return Collection of all input splits that should be processed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private Map<HadoopInputSplit, Integer> allSplits(HadoopMapReducePlan plan) {
+        Map<HadoopInputSplit, Integer> res = new HashMap<>();
+
+        int taskNum = 0;
+
+        for (UUID nodeId : plan.mapperNodeIds()) {
+            for (HadoopInputSplit split : plan.mappers(nodeId)) {
+                if (res.put(split, taskNum++) != null)
+                    throw new IllegalStateException("Split duplicate.");
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * Gets all reducers for this job.
+     *
+     * @param plan Map-reduce plan.
+     * @return Collection of reducers.
+     */
+    private Collection<Integer> allReducers(HadoopMapReducePlan plan) {
+        Collection<Integer> res = new HashSet<>();
+
+        for (int i = 0; i < plan.reducers(); i++)
+            res.add(i);
+
+        return res;
+    }
+
+    /**
+     * Processes node leave (or fail) event.
+     *
+     * @param evt Discovery event.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private void processNodeLeft(DiscoveryEvent evt) {
+        if (log.isDebugEnabled())
+            log.debug("Processing discovery event [locNodeId=" + ctx.localNodeId() + ", evt=" + evt + ']');
+
+        // Check only if this node is responsible for job status updates.
+        if (ctx.jobUpdateLeader()) {
+            boolean checkSetup = evt.eventNode().order() < ctx.localNodeOrder();
+
+            // Iteration over all local entries is correct since system cache is REPLICATED.
+            for (Object metaObj : jobMetaCache().values()) {
+                HadoopJobMetadata meta = (HadoopJobMetadata)metaObj;
+
+                HadoopJobId jobId = meta.jobId();
+
+                HadoopMapReducePlan plan = meta.mapReducePlan();
+
+                HadoopJobPhase phase = meta.phase();
+
+                try {
+                    if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) {
+                        // Failover setup task.
+                        HadoopJob job = job(jobId, meta.jobInfo());
+
+                        Collection<HadoopTaskInfo> setupTask = setupTask(jobId);
+
+                        assert setupTask != null;
+
+                        ctx.taskExecutor().run(job, setupTask);
+                    }
+                    else if (phase == PHASE_MAP || phase == PHASE_REDUCE) {
+                        // Must check all nodes, even that are not event node ID due to
+                        // multiple node failure possibility.
+                        Collection<HadoopInputSplit> cancelSplits = null;
+
+                        for (UUID nodeId : plan.mapperNodeIds()) {
+                            if (ctx.kernalContext().discovery().node(nodeId) == null) {
+                                // Node has left the grid.
+                                Collection<HadoopInputSplit> mappers = plan.mappers(nodeId);
+
+                                if (cancelSplits == null)
+                                    cancelSplits = new HashSet<>();
+
+                                cancelSplits.addAll(mappers);
+                            }
+                        }
+
+                        Collection<Integer> cancelReducers = null;
+
+                        for (UUID nodeId : plan.reducerNodeIds()) {
+                            if (ctx.kernalContext().discovery().node(nodeId) == null) {
+                                // Node has left the grid.
+                                int[] reducers = plan.reducers(nodeId);
+
+                                if (cancelReducers == null)
+                                    cancelReducers = new HashSet<>();
+
+                                for (int rdc : reducers)
+                                    cancelReducers.add(rdc);
+                            }
+                        }
+
+                        if (cancelSplits != null || cancelReducers != null)
+                            jobMetaCache().invoke(meta.jobId(), new CancelJobProcessor(null, new IgniteCheckedException(
+                                "One or more nodes participating in map-reduce job execution failed."), cancelSplits,
+                                cancelReducers));
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to cancel job: " + meta, e);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param updated Updated cache entries.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void processJobMetadataUpdates(
+        Iterable<CacheEntryEvent<? extends HadoopJobId, ? extends HadoopJobMetadata>> updated)
+        throws IgniteCheckedException {
+        UUID locNodeId = ctx.localNodeId();
+
+        for (CacheEntryEvent<? extends HadoopJobId, ? extends HadoopJobMetadata> entry : updated) {
+            HadoopJobId jobId = entry.getKey();
+            HadoopJobMetadata meta = entry.getValue();
+
+            if (meta == null || !ctx.isParticipating(meta))
+                continue;
+
+            if (log.isDebugEnabled())
+                log.debug("Processing job metadata update callback [locNodeId=" + locNodeId +
+                    ", meta=" + meta + ']');
+
+            try {
+                ctx.taskExecutor().onJobStateChanged(meta);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to process job state changed callback (will fail the job) " +
+                    "[locNodeId=" + locNodeId + ", jobId=" + jobId + ", meta=" + meta + ']', e);
+
+                transform(jobId, new CancelJobProcessor(null, e));
+
+                continue;
+            }
+
+            processJobMetaUpdate(jobId, meta, locNodeId);
+        }
+    }
+
+    /**
+     * @param jobId  Job ID.
+     * @param plan Map-reduce plan.
+     */
+    private void printPlan(HadoopJobId jobId, HadoopMapReducePlan plan) {
+        log.info("Plan for " + jobId);
+
+        SB b = new SB();
+
+        b.a("   Map: ");
+
+        for (UUID nodeId : plan.mapperNodeIds())
+            b.a(nodeId).a("=").a(plan.mappers(nodeId).size()).a(' ');
+
+        log.info(b.toString());
+
+        b = new SB();
+
+        b.a("   Reduce: ");
+
+        for (UUID nodeId : plan.reducerNodeIds())
+            b.a(nodeId).a("=").a(Arrays.toString(plan.reducers(nodeId))).a(' ');
+
+        log.info(b.toString());
+    }
+
+    /**
+     * @param jobId Job ID.
+     * @param meta Job metadata.
+     * @param locNodeId Local node ID.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void processJobMetaUpdate(HadoopJobId jobId, HadoopJobMetadata meta, UUID locNodeId)
+        throws IgniteCheckedException {
+        JobLocalState state = activeJobs.get(jobId);
+
+        HadoopJob job = job(jobId, meta.jobInfo());
+
+        HadoopMapReducePlan plan = meta.mapReducePlan();
+
+        switch (meta.phase()) {
+            case PHASE_SETUP: {
+                if (ctx.jobUpdateLeader()) {
+                    Collection<HadoopTaskInfo> setupTask = setupTask(jobId);
+
+                    if (setupTask != null)
+                        ctx.taskExecutor().run(job, setupTask);
+                }
+
+                break;
+            }
+
+            case PHASE_MAP: {
+                // Check if we should initiate new task on local node.
+                Collection<HadoopTaskInfo> tasks = mapperTasks(plan.mappers(locNodeId), meta);
+
+                if (tasks != null)
+                    ctx.taskExecutor().run(job, tasks);
+
+                break;
+            }
+
+            case PHASE_REDUCE: {
+                if (meta.pendingReducers().isEmpty() && ctx.jobUpdateLeader()) {
+                    HadoopTaskInfo info = new HadoopTaskInfo(COMMIT, jobId, 0, 0, null);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Submitting COMMIT task for execution [locNodeId=" + locNodeId +
+                                ", jobId=" + jobId + ']');
+
+                    ctx.taskExecutor().run(job, Collections.singletonList(info));
+
+                    break;
+                }
+
+                Collection<HadoopTaskInfo> tasks = reducerTasks(plan.reducers(locNodeId), job);
+
+                if (tasks != null)
+                    ctx.taskExecutor().run(job, tasks);
+
+                break;
+            }
+
+            case PHASE_CANCELLING: {
+                // Prevent multiple task executor notification.
+                if (state != null && state.onCancel()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Cancelling local task execution for job: " + meta);
+
+                    ctx.taskExecutor().cancelTasks(jobId);
+                }
+
+                if (meta.pendingSplits().isEmpty() && meta.pendingReducers().isEmpty()) {
+                    if (ctx.jobUpdateLeader()) {
+                        if (state == null)
+                            state = initState(jobId);
+
+                        // Prevent running multiple abort tasks.
+                        if (state.onAborted()) {
+                            HadoopTaskInfo info = new HadoopTaskInfo(ABORT, jobId, 0, 0, null);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Submitting ABORT task for execution [locNodeId=" + locNodeId +
+                                        ", jobId=" + jobId + ']');
+
+                            ctx.taskExecutor().run(job, Collections.singletonList(info));
+                        }
+                    }
+
+                    break;
+                }
+                else {
+                    // Check if there are unscheduled mappers or reducers.
+                    Collection<HadoopInputSplit> cancelMappers = new ArrayList<>();
+                    Collection<Integer> cancelReducers = new ArrayList<>();
+
+                    Collection<HadoopInputSplit> mappers = plan.mappers(ctx.localNodeId());
+
+                    if (mappers != null) {
+                        for (HadoopInputSplit b : mappers) {
+                            if (state == null || !state.mapperScheduled(b))
+                                cancelMappers.add(b);
+                        }
+                    }
+
+                    int[] rdc = plan.reducers(ctx.localNodeId());
+
+                    if (rdc != null) {
+                        for (int r : rdc) {
+                            if (state == null || !state.reducerScheduled(r))
+                                cancelReducers.add(r);
+                        }
+                    }
+
+                    if (!cancelMappers.isEmpty() || !cancelReducers.isEmpty())
+                        transform(jobId, new CancelJobProcessor(null, cancelMappers, cancelReducers));
+                }
+
+                break;
+            }
+
+            case PHASE_COMPLETE: {
+                if (log.isDebugEnabled())
+                    log.debug("Job execution is complete, will remove local state from active jobs " +
+                        "[jobId=" + jobId + ", meta=" + meta + ']');
+
+                if (state != null) {
+                    state = activeJobs.remove(jobId);
+
+                    assert state != null;
+
+                    ctx.shuffle().jobFinished(jobId);
+                }
+
+                GridFutureAdapter<HadoopJobId> finishFut = activeFinishFuts.remove(jobId);
+
+                if (finishFut != null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Completing job future [locNodeId=" + locNodeId + ", meta=" + meta + ']');
+
+                    finishFut.onDone(jobId, meta.failCause());
+                }
+
+                if (ctx.jobUpdateLeader())
+                    job.cleanupStagingDirectory();
+
+                jobs.remove(jobId);
+
+                job.dispose(false);
+
+                if (ctx.jobUpdateLeader()) {
+                    ClassLoader ldr = job.getClass().getClassLoader();
+
+                    try {
+                        String statWriterClsName = job.info().property(HadoopUtils.JOB_COUNTER_WRITER_PROPERTY);
+
+                        if (statWriterClsName != null) {
+                            Class<?> cls = ldr.loadClass(statWriterClsName);
+
+                            HadoopCounterWriter writer = (HadoopCounterWriter)cls.newInstance();
+
+                            HadoopCounters cntrs = meta.counters();
+
+                            writer.write(job.info(), jobId, cntrs);
+                        }
+                    }
+                    catch (Exception e) {
+                        log.error("Can't write statistic due to: ", e);
+                    }
+                }
+
+                break;
+            }
+
+            default:
+                throw new IllegalStateException("Unknown phase: " + meta.phase());
+        }
+    }
+
+    /**
+     * Creates setup task based on job information.
+     *
+     * @param jobId Job ID.
+     * @return Setup task wrapped in collection.
+     */
+    @Nullable private Collection<HadoopTaskInfo> setupTask(HadoopJobId jobId) {
+        if (activeJobs.containsKey(jobId))
+            return null;
+        else {
+            initState(jobId);
+
+            return Collections.singleton(new HadoopTaskInfo(SETUP, jobId, 0, 0, null));
+        }
+    }
+
+    /**
+     * Creates mapper tasks based on job information.
+     *
+     * @param mappers Mapper blocks.
+     * @param meta Job metadata.
+     * @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node.
+     */
+    private Collection<HadoopTaskInfo> mapperTasks(Iterable<HadoopInputSplit> mappers, HadoopJobMetadata meta) {
+        UUID locNodeId = ctx.localNodeId();
+        HadoopJobId jobId = meta.jobId();
+
+        JobLocalState state = activeJobs.get(jobId);
+
+        Collection<HadoopTaskInfo> tasks = null;
+
+        if (mappers != null) {
+            if (state == null)
+                state = initState(jobId);
+
+            for (HadoopInputSplit split : mappers) {
+                if (state.addMapper(split)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Submitting MAP task for execution [locNodeId=" + locNodeId +
+                            ", split=" + split + ']');
+
+                    HadoopTaskInfo taskInfo = new HadoopTaskInfo(MAP, jobId, meta.taskNumber(split), 0, split);
+
+                    if (tasks == null)
+                        tasks = new ArrayList<>();
+
+                    tasks.add(taskInfo);
+                }
+            }
+        }
+
+        return tasks;
+    }
+
+    /**
+     * Creates reducer tasks based on job information.
+     *
+     * @param reducers Reducers (may be {@code null}).
+     * @param job Job instance.
+     * @return Collection of task infos.
+     */
+    private Collection<HadoopTaskInfo> reducerTasks(int[] reducers, HadoopJob job) {
+        UUID locNodeId = ctx.localNodeId();
+        HadoopJobId jobId = job.id();
+
+        JobLocalState state = activeJobs.get(jobId);
+
+        Collection<HadoopTaskInfo> tasks = null;
+
+        if (reducers != null) {
+            if (state == null)
+                state = initState(job.id());
+
+            for (int rdc : reducers) {
+                if (state.addReducer(rdc)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Submitting REDUCE task for execution [locNodeId=" + locNodeId +
+                            ", rdc=" + rdc + ']');
+
+                    HadoopTaskInfo taskInfo = new HadoopTaskInfo(REDUCE, jobId, rdc, 0, null);
+
+                    if (tasks == null)
+                        tasks = new ArrayList<>();
+
+                    tasks.add(taskInfo);
+                }
+            }
+        }
+
+        return tasks;
+    }
+
+    /**
+     * Initializes local state for given job metadata.
+     *
+     * @param jobId Job ID.
+     * @return Local state.
+     */
+    private JobLocalState initState(HadoopJobId jobId) {
+        return F.addIfAbsent(activeJobs, jobId, new JobLocalState());
+    }
+
+    /**
+     * Gets or creates job instance.
+     *
+     * @param jobId Job ID.
+     * @param jobInfo Job info.
+     * @return Job.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public HadoopJob job(HadoopJobId jobId, @Nullable HadoopJobInfo jobInfo) throws IgniteCheckedException {
+        GridFutureAdapterEx<HadoopJob> fut = jobs.get(jobId);
+
+        if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapterEx<HadoopJob>())) != null)
+            return fut.get();
+
+        fut = jobs.get(jobId);
+
+        HadoopJob job = null;
+
+        try {
+            if (jobInfo == null) {
+                HadoopJobMetadata meta = jobMetaCache().get(jobId);
+
+                if (meta == null)
+                    throw new IgniteCheckedException("Failed to find job metadata for ID: " + jobId);
+
+                jobInfo = meta.jobInfo();
+            }
+
+            job = jobInfo.createJob(jobId, log);
+
+            job.initialize(false, ctx.localNodeId());
+
+            fut.onDone(job);
+
+            return job;
+        }
+        catch (IgniteCheckedException e) {
+            fut.onDone(e);
+
+            jobs.remove(jobId, fut);
+
+            if (job != null) {
+                try {
+                    job.dispose(false);
+                }
+                catch (IgniteCheckedException e0) {
+                    U.error(log, "Failed to dispose job: " + jobId, e0);
+                }
+            }
+
+            throw e;
+        }
+    }
+
+    /**
+     * Kills job.
+     *
+     * @param jobId Job ID.
+     * @return {@code True} if job was killed.
+     * @throws IgniteCheckedException If failed.
+     */
+    public boolean killJob(HadoopJobId jobId) throws IgniteCheckedException {
+        if (!busyLock.tryReadLock())
+            return false; // Grid is stopping.
+
+        try {
+            HadoopJobMetadata meta = jobMetaCache().get(jobId);
+
+            if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) {
+                HadoopTaskCancelledException err = new HadoopTaskCancelledException("Job cancelled.");
+
+                jobMetaCache().invoke(jobId, new CancelJobProcessor(null, err));
+            }
+        }
+        finally {
+            busyLock.readUnlock();
+        }
+
+        IgniteInternalFuture<?> fut = finishFuture(jobId);
+
+        if (fut != null) {
+            try {
+                fut.get();
+            }
+            catch (Throwable e) {
+                if (e.getCause() instanceof HadoopTaskCancelledException)
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Returns job counters.
+     *
+     * @param jobId Job identifier.
+     * @return Job counters or {@code null} if job cannot be found.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public HadoopCounters jobCounters(HadoopJobId jobId) throws IgniteCheckedException {
+        if (!busyLock.tryReadLock())
+            return null;
+
+        try {
+            final HadoopJobMetadata meta = jobMetaCache().get(jobId);
+
+            return meta != null ? meta.counters() : null;
+        }
+        finally {
+            busyLock.readUnlock();
+        }
+    }
+
+    /**
+     * Event handler protected by busy lock.
+     */
+    private abstract class EventHandler implements Runnable {
+        /** {@inheritDoc} */
+        @Override public void run() {
+            if (!busyLock.tryReadLock())
+                return;
+
+            try {
+                body();
+            }
+            catch (Throwable e) {
+                U.error(log, "Unhandled exception while processing event.", e);
+            }
+            finally {
+                busyLock.readUnlock();
+            }
+        }
+
+        /**
+         * Handler body.
+         */
+        protected abstract void body() throws Exception;
+    }
+
+    /**
+     *
+     */
+    private class JobLocalState {
+        /** Mappers. */
+        private final Collection<HadoopInputSplit> currMappers = new HashSet<>();
+
+        /** Reducers. */
+        private final Collection<Integer> currReducers = new HashSet<>();
+
+        /** Number of completed mappers. */
+        private final AtomicInteger completedMappersCnt = new AtomicInteger();
+
+        /** Cancelled flag. */
+        private boolean cancelled;
+
+        /** Aborted flag. */
+        private boolean aborted;
+
+        /**
+         * @param mapSplit Map split to add.
+         * @return {@code True} if mapper was added.
+         */
+        private boolean addMapper(HadoopInputSplit mapSplit) {
+            return currMappers.add(mapSplit);
+        }
+
+        /**
+         * @param rdc Reducer number to add.
+         * @return {@code True} if reducer was added.
+         */
+        private boolean addReducer(int rdc) {
+            return currReducers.add(rdc);
+        }
+
+        /**
+         * Checks whether this split was scheduled for given attempt.
+         *
+         * @param mapSplit Map split to check.
+         * @return {@code True} if mapper was scheduled.
+         */
+        public boolean mapperScheduled(HadoopInputSplit mapSplit) {
+            return currMappers.contains(mapSplit);
+        }
+
+        /**
+         * Checks whether this split was scheduled for given attempt.
+         *
+         * @param rdc Reducer number to check.
+         * @return {@code True} if reducer was scheduled.
+         */
+        public boolean reducerScheduled(int rdc) {
+            return currReducers.contains(rdc);
+        }
+
+        /**
+         * @param taskInfo Task info.
+         * @param status Task status.
+         * @param prev Previous closure.
+         */
+        private void onSetupFinished(final HadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) {
+            final HadoopJobId jobId = taskInfo.jobId();
+
+            if (status.state() == FAILED || status.state() == CRASHED)
+                transform(jobId, new CancelJobProcessor(prev, status.failCause()));
+            else
+                transform(jobId, new UpdatePhaseProcessor(prev, PHASE_MAP));
+        }
+
+        /**
+         * @param taskInfo Task info.
+         * @param status Task status.
+         * @param prev Previous closure.
+         */
+        private void onMapFinished(final HadoopTaskInfo taskInfo, HadoopTaskStatus status,
+            final StackedProcessor prev) {
+            final HadoopJobId jobId = taskInfo.jobId();
+
+            boolean lastMapperFinished = completedMappersCnt.incrementAndGet() == currMappers.size();
+
+            if (status.state() == FAILED || status.state() == CRASHED) {
+                // Fail the whole job.
+                transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), status.failCause()));
+
+                return;
+            }
+
+            IgniteInClosure<IgniteInternalFuture<?>> cacheUpdater = new CIX1<IgniteInternalFuture<?>>() {
+                @Override public void applyx(IgniteInternalFuture<?> f) {
+                    Throwable err = null;
+
+                    if (f != null) {
+                        try {
+                            f.get();
+                        }
+                        catch (IgniteCheckedException e) {
+                            err = e;
+                        }
+                    }
+
+                    transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), err));
+                }
+            };
+
+            if (lastMapperFinished)
+                ctx.shuffle().flush(jobId).listenAsync(cacheUpdater);
+            else
+                cacheUpdater.apply(null);
+        }
+
+        /**
+         * @param taskInfo Task info.
+         * @param status Task status.
+         * @param prev Previous closure.
+         */
+        private void onReduceFinished(HadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) {
+            HadoopJobId jobId = taskInfo.jobId();
+            if (status.state() == FAILED || status.state() == CRASHED)
+                // Fail the whole job.
+                transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber(), status.failCause()));
+            else
+                transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber()));
+        }
+
+        /**
+         * @param taskInfo Task info.
+         * @param status Task status.
+         * @param prev Previous closure.
+         */
+        private void onCombineFinished(HadoopTaskInfo taskInfo, HadoopTaskStatus status,
+            final StackedProcessor prev) {
+            final HadoopJobId jobId = taskInfo.jobId();
+
+            if (status.state() == FAILED || status.state() == CRASHED)
+                // Fail the whole job.
+                transform(jobId, new RemoveMappersProcessor(prev, currMappers, status.failCause()));
+            else {
+                ctx.shuffle().flush(jobId).listenAsync(new CIX1<IgniteInternalFuture<?>>() {
+                    @Override public void applyx(IgniteInternalFuture<?> f) {
+                        Throwable err = null;
+
+                        if (f != null) {
+                            try {
+                                f.get();
+                            }
+                            catch (IgniteCheckedException e) {
+                                err = e;
+                            }
+                        }
+
+                        transform(jobId, new RemoveMappersProcessor(prev, currMappers, err));
+                    }
+                });
+            }
+        }
+
+        /**
+         * @return {@code True} if job was cancelled by this (first) call.
+         */
+        public boolean onCancel() {
+            if (!cancelled && !aborted) {
+                cancelled = true;
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * @return {@code True} if job was aborted this (first) call.
+         */
+        public boolean onAborted() {
+            if (!aborted) {
+                aborted = true;
+
+                return true;
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * Update job phase transform closure.
+     */
+    private static class UpdatePhaseProcessor extends StackedProcessor {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Phase to update. */
+        private final HadoopJobPhase phase;
+
+        /**
+         * @param prev Previous closure.
+         * @param phase Phase to update.
+         */
+        private UpdatePhaseProcessor(@Nullable StackedProcessor prev, HadoopJobPhase phase) {
+            super(prev);
+
+            this.phase = phase;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+            cp.phase(phase);
+        }
+    }
+
+    /**
+     * Remove mapper transform closure.
+     */
+    private static class RemoveMappersProcessor extends StackedProcessor {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Mapper split to remove. */
+        private final Collection<HadoopInputSplit> splits;
+
+        /** Error. */
+        private final Throwable err;
+
+        /**
+         * @param prev Previous closure.
+         * @param split Mapper split to remove.
+         * @param err Error.
+         */
+        private RemoveMappersProcessor(@Nullable StackedProcessor prev, HadoopInputSplit split, Throwable err) {
+            this(prev, Collections.singletonList(split), err);
+        }
+
+        /**
+         * @param prev Previous closure.
+         * @param splits Mapper splits to remove.
+         * @param err Error.
+         */
+        private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<HadoopInputSplit> splits,
+            Throwable err) {
+            super(prev);
+
+            this.splits = splits;
+            this.err = err;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+            Map<HadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
+
+            for (HadoopInputSplit s : splits)
+                splitsCp.remove(s);
+
+            cp.pendingSplits(splitsCp);
+
+            if (cp.phase() != PHASE_CANCELLING && err != null)
+                cp.failCause(err);
+
+            if (err != null)
+                cp.phase(PHASE_CANCELLING);
+
+            if (splitsCp.isEmpty()) {
+                if (cp.phase() != PHASE_CANCELLING)
+                    cp.phase(PHASE_REDUCE);
+            }
+        }
+    }
+
+    /**
+     * Remove reducer transform closure.
+     */
+    private static class RemoveReducerProcessor extends StackedProcessor {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Mapper split to remove. */
+        private final int rdc;
+
+        /** Error. */
+        private Throwable err;
+
+        /**
+         * @param prev Previous closure.
+         * @param rdc Reducer to remove.
+         */
+        private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc) {
+            super(prev);
+
+            this.rdc = rdc;
+        }
+
+        /**
+         * @param prev Previous closure.
+         * @param rdc Reducer to remove.
+         * @param err Error.
+         */
+        private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc, Throwable err) {
+            super(prev);
+
+            this.rdc = rdc;
+            this.err = err;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+            Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers());
+
+            rdcCp.remove(rdc);
+
+            cp.pendingReducers(rdcCp);
+
+            if (err != null) {
+                cp.phase(PHASE_CANCELLING);
+                cp.failCause(err);
+            }
+        }
+    }
+
+    /**
+     * Initialize reducers.
+     */
+    private static class InitializeReducersProcessor extends StackedProcessor {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Reducers. */
+        private final Collection<Integer> rdc;
+
+        /** Process descriptor for reducers. */
+        private final HadoopProcessDescriptor desc;
+
+        /**
+         * @param prev Previous closure.
+         * @param rdc Reducers to initialize.
+         * @param desc External process descriptor.
+         */
+        private InitializeReducersProcessor(@Nullable StackedProcessor prev,
+            Collection<Integer> rdc,
+            HadoopProcessDescriptor desc) {
+            super(prev);
+
+            assert !F.isEmpty(rdc);
+            assert desc != null;
+
+            this.rdc = rdc;
+            this.desc = desc;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+            Map<Integer, HadoopProcessDescriptor> oldMap = meta.reducersAddresses();
+
+            Map<Integer, HadoopProcessDescriptor> rdcMap = oldMap == null ?
+                new HashMap<Integer, HadoopProcessDescriptor>() : new HashMap<>(oldMap);
+
+            for (Integer r : rdc)
+                rdcMap.put(r, desc);
+
+            cp.reducersAddresses(rdcMap);
+        }
+    }
+
+    /**
+     * Remove reducer transform closure.
+     */
+    private static class CancelJobProcessor extends StackedProcessor {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Mapper split to remove. */
+        private final Collection<HadoopInputSplit> splits;
+
+        /** Reducers to remove. */
+        private final Collection<Integer> rdc;
+
+        /** Error. */
+        private final Throwable err;
+
+        /**
+         * @param prev Previous closure.
+         * @param err Fail cause.
+         */
+        private CancelJobProcessor(@Nullable StackedProcessor prev, Throwable err) {
+            this(prev, err, null, null);
+        }
+
+        /**
+         * @param prev Previous closure.
+         * @param splits Splits to remove.
+         * @param rdc Reducers to remove.
+         */
+        private CancelJobProcessor(@Nullable StackedProcessor prev,
+            Collection<HadoopInputSplit> splits,
+            Collection<Integer> rdc) {
+            this(prev, null, splits, rdc);
+        }
+
+        /**
+         * @param prev Previous closure.
+         * @param err Error.
+         * @param splits Splits to remove.
+         * @param rdc Reducers to remove.
+         */
+        private CancelJobProcessor(@Nullable StackedProcessor prev,
+            Throwable err,
+            Collection<HadoopInputSplit> splits,
+            Collection<Integer> rdc) {
+            super(prev);
+
+            this.splits = splits;
+            this.rdc = rdc;
+            this.err = err;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+            assert meta.phase() == PHASE_CANCELLING || err != null: "Invalid phase for cancel: " + meta;
+
+            Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers());
+
+            if (rdc != null)
+                rdcCp.removeAll(rdc);
+
+            cp.pendingReducers(rdcCp);
+
+            Map<HadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
+
+            if (splits != null) {
+                for (HadoopInputSplit s : splits)
+                    splitsCp.remove(s);
+            }
+
+            cp.pendingSplits(splitsCp);
+
+            cp.phase(PHASE_CANCELLING);
+
+            if (err != null)
+                cp.failCause(err);
+        }
+    }
+
+    /**
+     * Increment counter values closure.
+     */
+    private static class IncrementCountersProcessor extends StackedProcessor {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final HadoopCounters counters;
+
+        /**
+         * @param prev Previous closure.
+         * @param counters Task counters to add into job counters.
+         */
+        private IncrementCountersProcessor(@Nullable StackedProcessor prev, HadoopCounters counters) {
+            super(prev);
+
+            assert counters != null;
+
+            this.counters = counters;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+            HadoopCounters cntrs = new HadoopCountersImpl(cp.counters());
+
+            cntrs.merge(counters);
+
+            cp.counters(cntrs);
+        }
+    }
+
+    /**
+     * Abstract stacked closure.
+     */
+    private abstract static class StackedProcessor implements
+        EntryProcessor<HadoopJobId, HadoopJobMetadata, Void>, Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final StackedProcessor prev;
+
+        /**
+         * @param prev Previous closure.
+         */
+        private StackedProcessor(@Nullable StackedProcessor prev) {
+            this.prev = prev;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<HadoopJobId, HadoopJobMetadata> e, Object... args) {
+            HadoopJobMetadata val = apply(e.getValue());
+
+            if (val != null)
+                e.setValue(val);
+            else
+                e.remove();;
+
+            return null;
+        }
+
+        /**
+         * @param meta Old value.
+         * @return New value.
+         */
+        private HadoopJobMetadata apply(HadoopJobMetadata meta) {
+            if (meta == null)
+                return null;
+
+            HadoopJobMetadata cp = prev != null ? prev.apply(meta) : new HadoopJobMetadata(meta);
+
+            update(meta, cp);
+
+            return cp;
+        }
+
+        /**
+         * Update given job metadata object.
+         *
+         * @param meta Initial job metadata.
+         * @param cp Copy.
+         */
+        protected abstract void update(HadoopJobMetadata meta, HadoopJobMetadata cp);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java
deleted file mode 100644
index 1670a8a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.message;
-
-import java.io.*;
-
-/**
- * Marker interface for all hadoop messages.
- */
-public interface GridHadoopMessage extends Externalizable {
-    // No-op.
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
new file mode 100644
index 0000000..cab6138
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.message;
+
+import java.io.*;
+
+/**
+ * Marker interface for all hadoop messages.
+ */
+public interface HadoopMessage extends Externalizable {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java
deleted file mode 100644
index 7988403..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Map-reduce plan.
- */
-public class GridHadoopDefaultMapReducePlan implements GridHadoopMapReducePlan {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Mappers map. */
-    private Map<UUID, Collection<GridHadoopInputSplit>> mappers;
-
-    /** Reducers map. */
-    private Map<UUID, int[]> reducers;
-
-    /** Mappers count. */
-    private int mappersCnt;
-
-    /** Reducers count. */
-    private int reducersCnt;
-
-    /**
-     * @param mappers Mappers map.
-     * @param reducers Reducers map.
-     */
-    public GridHadoopDefaultMapReducePlan(Map<UUID, Collection<GridHadoopInputSplit>> mappers,
-        Map<UUID, int[]> reducers) {
-        this.mappers = mappers;
-        this.reducers = reducers;
-
-        if (mappers != null) {
-            for (Collection<GridHadoopInputSplit> splits : mappers.values())
-                mappersCnt += splits.size();
-        }
-
-        if (reducers != null) {
-            for (int[] rdcrs : reducers.values())
-                reducersCnt += rdcrs.length;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public int mappers() {
-        return mappersCnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int reducers() {
-        return reducersCnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID nodeForReducer(int reducer) {
-        assert reducer >= 0 && reducer < reducersCnt : reducer;
-
-        for (Map.Entry<UUID, int[]> entry : reducers.entrySet()) {
-            for (int r : entry.getValue()) {
-                if (r == reducer)
-                    return entry.getKey();
-            }
-        }
-
-        throw new IllegalStateException("Not found reducer index: " + reducer);
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public Collection<GridHadoopInputSplit> mappers(UUID nodeId) {
-        return mappers.get(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public int[] reducers(UUID nodeId) {
-        return reducers.get(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<UUID> mapperNodeIds() {
-        return mappers.keySet();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<UUID> reducerNodeIds() {
-        return reducers.keySet();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
deleted file mode 100644
index 342cbab..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java
+++ /dev/null
@@ -1,434 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.igfs.hadoop.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.resources.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-import static org.apache.ignite.IgniteFs.*;
-
-/**
- * Default map-reduce planner implementation.
- */
-public class GridHadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner {
-    /** Injected grid. */
-    @IgniteInstanceResource
-    private Ignite ignite;
-
-    /** Logger. */
-    @SuppressWarnings("UnusedDeclaration")
-    @LoggerResource
-    private IgniteLogger log;
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top,
-        @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException {
-        // Convert collection of topology nodes to collection of topology node IDs.
-        Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f);
-
-        for (ClusterNode topNode : top)
-            topIds.add(topNode.id());
-
-        Map<UUID, Collection<GridHadoopInputSplit>> mappers = mappers(top, topIds, job.input());
-
-        int rdcCnt = job.info().reducers();
-
-        if (rdcCnt < 0)
-            throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + rdcCnt);
-
-        Map<UUID, int[]> reducers = reducers(top, mappers, rdcCnt);
-
-        return new GridHadoopDefaultMapReducePlan(mappers, reducers);
-    }
-
-    /**
-     * Create plan for mappers.
-     *
-     * @param top Topology nodes.
-     * @param topIds Topology node IDs.
-     * @param splits Splits.
-     * @return Mappers map.
-     * @throws IgniteCheckedException If failed.
-     */
-    private Map<UUID, Collection<GridHadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds,
-        Iterable<GridHadoopInputSplit> splits) throws IgniteCheckedException {
-        Map<UUID, Collection<GridHadoopInputSplit>> mappers = new HashMap<>();
-
-        Map<String, Collection<UUID>> nodes = hosts(top);
-
-        Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load.
-
-        for (UUID nodeId : topIds)
-            nodeLoads.put(nodeId, 0);
-
-        for (GridHadoopInputSplit split : splits) {
-            UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads);
-
-            if (log.isDebugEnabled())
-                log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']');
-
-            Collection<GridHadoopInputSplit> nodeSplits = mappers.get(nodeId);
-
-            if (nodeSplits == null) {
-                nodeSplits = new ArrayList<>();
-
-                mappers.put(nodeId, nodeSplits);
-            }
-
-            nodeSplits.add(split);
-
-            // Updated node load.
-            nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1);
-        }
-
-        return mappers;
-    }
-
-    /**
-     * Groups nodes by host names.
-     *
-     * @param top Topology to group.
-     * @return Map.
-     */
-    private static Map<String, Collection<UUID>> hosts(Collection<ClusterNode> top) {
-        Map<String, Collection<UUID>> grouped = U.newHashMap(top.size());
-
-        for (ClusterNode node : top) {
-            for (String host : node.hostNames()) {
-                Collection<UUID> nodeIds = grouped.get(host);
-
-                if (nodeIds == null) {
-                    // Expecting 1-2 nodes per host.
-                    nodeIds = new ArrayList<>(2);
-
-                    grouped.put(host, nodeIds);
-                }
-
-                nodeIds.add(node.id());
-            }
-        }
-
-        return grouped;
-    }
-
-    /**
-     * Determine the best node for this split.
-     *
-     * @param split Split.
-     * @param topIds Topology node IDs.
-     * @param nodes Nodes.
-     * @param nodeLoads Node load tracker.
-     * @return Node ID.
-     */
-    @SuppressWarnings("unchecked")
-    private UUID nodeForSplit(GridHadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes,
-        Map<UUID, Integer> nodeLoads) throws IgniteCheckedException {
-        if (split instanceof GridHadoopFileBlock) {
-            GridHadoopFileBlock split0 = (GridHadoopFileBlock)split;
-
-            if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) {
-                IgfsHadoopEndpoint endpoint = new IgfsHadoopEndpoint(split0.file().getAuthority());
-
-                IgfsEx igfs = null;
-
-                if (F.eq(ignite.name(), endpoint.grid()))
-                    igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs());
-
-                if (igfs != null && !igfs.isProxy(split0.file())) {
-                    Collection<IgfsBlockLocation> blocks;
-
-                    try {
-                        blocks = igfs.affinity(new IgfsPath(split0.file()), split0.start(), split0.length());
-                    }
-                    catch (IgniteException e) {
-                        throw new IgniteCheckedException(e);
-                    }
-
-                    assert blocks != null;
-
-                    if (blocks.size() == 1)
-                        // Fast-path, split consists of one IGFS block (as in most cases).
-                        return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false);
-                    else {
-                        // Slow-path, file consists of multiple IGFS blocks. First, find the most co-located nodes.
-                        Map<UUID, Long> nodeMap = new HashMap<>();
-
-                        List<UUID> bestNodeIds = null;
-                        long bestLen = -1L;
-
-                        for (IgfsBlockLocation block : blocks) {
-                            for (UUID blockNodeId : block.nodeIds()) {
-                                if (topIds.contains(blockNodeId)) {
-                                    Long oldLen = nodeMap.get(blockNodeId);
-                                    long newLen = oldLen == null ? block.length() : oldLen + block.length();
-
-                                    nodeMap.put(blockNodeId, newLen);
-
-                                    if (bestNodeIds == null || bestLen < newLen) {
-                                        bestNodeIds = new ArrayList<>(1);
-
-                                        bestNodeIds.add(blockNodeId);
-
-                                        bestLen = newLen;
-                                    }
-                                    else if (bestLen == newLen) {
-                                        assert !F.isEmpty(bestNodeIds);
-
-                                        bestNodeIds.add(blockNodeId);
-                                    }
-                                }
-                            }
-                        }
-
-                        if (bestNodeIds != null) {
-                            return bestNodeIds.size() == 1 ? bestNodeIds.get(0) :
-                                bestNode(bestNodeIds, topIds, nodeLoads, true);
-                        }
-                    }
-                }
-            }
-        }
-
-        // Cannot use local IGFS for some reason, try selecting the node by host.
-        Collection<UUID> blockNodes = null;
-
-        for (String host : split.hosts()) {
-            Collection<UUID> hostNodes = nodes.get(host);
-
-            if (!F.isEmpty(hostNodes)) {
-                if (blockNodes == null)
-                    blockNodes = new ArrayList<>(hostNodes);
-                else
-                    blockNodes.addAll(hostNodes);
-            }
-        }
-
-        return bestNode(blockNodes, topIds, nodeLoads, false);
-    }
-
-    /**
-     * Finds the best (the least loaded) node among the candidates.
-     *
-     * @param candidates Candidates.
-     * @param topIds Topology node IDs.
-     * @param nodeLoads Known node loads.
-     * @param skipTopCheck Whether to skip topology check.
-     * @return The best node.
-     */
-    private UUID bestNode(@Nullable Collection<UUID> candidates, Collection<UUID> topIds, Map<UUID, Integer> nodeLoads,
-        boolean skipTopCheck) {
-        UUID bestNode = null;
-        int bestLoad = Integer.MAX_VALUE;
-
-        if (candidates != null) {
-            for (UUID candidate : candidates) {
-                if (skipTopCheck || topIds.contains(candidate)) {
-                    int load = nodeLoads.get(candidate);
-
-                    if (bestNode == null || bestLoad > load) {
-                        bestNode = candidate;
-                        bestLoad = load;
-
-                        if (bestLoad == 0)
-                            break; // Minimum load possible, no need for further iterations.
-                    }
-                }
-            }
-        }
-
-        if (bestNode == null) {
-            // Blocks are located on nodes which are not Hadoop-enabled, assign to the least loaded one.
-            bestLoad = Integer.MAX_VALUE;
-
-            for (UUID nodeId : topIds) {
-                int load = nodeLoads.get(nodeId);
-
-                if (bestNode == null || bestLoad > load) {
-                    bestNode = nodeId;
-                    bestLoad = load;
-
-                    if (bestLoad == 0)
-                        break; // Minimum load possible, no need for further iterations.
-                }
-            }
-        }
-
-        assert bestNode != null;
-
-        return bestNode;
-    }
-
-    /**
-     * Create plan for reducers.
-     *
-     * @param top Topology.
-     * @param mappers Mappers map.
-     * @param reducerCnt Reducers count.
-     * @return Reducers map.
-     */
-    private Map<UUID, int[]> reducers(Collection<ClusterNode> top,
-        Map<UUID, Collection<GridHadoopInputSplit>> mappers, int reducerCnt) {
-        // Determine initial node weights.
-        int totalWeight = 0;
-
-        List<WeightedNode> nodes = new ArrayList<>(top.size());
-
-        for (ClusterNode node : top) {
-            Collection<GridHadoopInputSplit> split = mappers.get(node.id());
-
-            int weight = reducerNodeWeight(node, split != null ? split.size() : 0);
-
-            nodes.add(new WeightedNode(node.id(), weight, weight));
-
-            totalWeight += weight;
-        }
-
-        // Adjust weights.
-        int totalAdjustedWeight = 0;
-
-        for (WeightedNode node : nodes) {
-            node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight;
-
-            node.weight = Math.round(node.floatWeight);
-
-            totalAdjustedWeight += node.weight;
-        }
-
-        // Apply redundant/lost reducers.
-        Collections.sort(nodes);
-
-        if (totalAdjustedWeight > reducerCnt) {
-            // Too much reducers set.
-            ListIterator<WeightedNode> iter = nodes.listIterator(nodes.size() - 1);
-
-            while (totalAdjustedWeight != reducerCnt) {
-                if (!iter.hasPrevious())
-                    iter = nodes.listIterator(nodes.size() - 1);
-
-                WeightedNode node = iter.previous();
-
-                if (node.weight > 0) {
-                    node.weight -= 1;
-
-                    totalAdjustedWeight--;
-                }
-            }
-        }
-        else if (totalAdjustedWeight < reducerCnt) {
-            // Not enough reducers set.
-            ListIterator<WeightedNode> iter = nodes.listIterator(0);
-
-            while (totalAdjustedWeight != reducerCnt) {
-                if (!iter.hasNext())
-                    iter = nodes.listIterator(0);
-
-                WeightedNode node = iter.next();
-
-                if (node.floatWeight > 0.0f) {
-                    node.weight += 1;
-
-                    totalAdjustedWeight++;
-                }
-            }
-        }
-
-        int idx = 0;
-
-        Map<UUID, int[]> reducers = new HashMap<>(nodes.size(), 1.0f);
-
-        for (WeightedNode node : nodes) {
-            if (node.weight > 0) {
-                int[] arr = new int[node.weight];
-
-                for (int i = 0; i < arr.length; i++)
-                    arr[i] = idx++;
-
-                reducers.put(node.nodeId, arr);
-            }
-        }
-
-        return reducers;
-    }
-
-    /**
-     * Calculate node weight based on node metrics and data co-location.
-     *
-     * @param node Node.
-     * @param splitCnt Splits mapped to this node.
-     * @return Node weight.
-     */
-    @SuppressWarnings("UnusedParameters")
-    protected int reducerNodeWeight(ClusterNode node, int splitCnt) {
-        return splitCnt;
-    }
-
-    /**
-     * Weighted node.
-     */
-    private static class WeightedNode implements Comparable<WeightedNode> {
-        /** Node ID. */
-        private final UUID nodeId;
-
-        /** Weight. */
-        private int weight;
-
-        /** Floating point weight. */
-        private float floatWeight;
-
-        /**
-         * Constructor.
-         *
-         * @param nodeId Node ID.
-         * @param weight Weight.
-         * @param floatWeight Floating point weight.
-         */
-        private WeightedNode(UUID nodeId, int weight, float floatWeight) {
-            this.nodeId = nodeId;
-            this.weight = weight;
-            this.floatWeight = floatWeight;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            return obj != null && obj instanceof WeightedNode && F.eq(nodeId, ((WeightedNode)obj).nodeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return nodeId.hashCode();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int compareTo(@NotNull WeightedNode other) {
-            float res = other.floatWeight - floatWeight;
-
-            return res > 0.0f ? 1 : res < 0.0f ? -1 : nodeId.compareTo(other.nodeId);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
new file mode 100644
index 0000000..1413612
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.planner;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Map-reduce plan.
+ */
+public class HadoopDefaultMapReducePlan implements HadoopMapReducePlan {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Mappers map. */
+    private Map<UUID, Collection<HadoopInputSplit>> mappers;
+
+    /** Reducers map. */
+    private Map<UUID, int[]> reducers;
+
+    /** Mappers count. */
+    private int mappersCnt;
+
+    /** Reducers count. */
+    private int reducersCnt;
+
+    /**
+     * @param mappers Mappers map.
+     * @param reducers Reducers map.
+     */
+    public HadoopDefaultMapReducePlan(Map<UUID, Collection<HadoopInputSplit>> mappers,
+        Map<UUID, int[]> reducers) {
+        this.mappers = mappers;
+        this.reducers = reducers;
+
+        if (mappers != null) {
+            for (Collection<HadoopInputSplit> splits : mappers.values())
+                mappersCnt += splits.size();
+        }
+
+        if (reducers != null) {
+            for (int[] rdcrs : reducers.values())
+                reducersCnt += rdcrs.length;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int mappers() {
+        return mappersCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int reducers() {
+        return reducersCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID nodeForReducer(int reducer) {
+        assert reducer >= 0 && reducer < reducersCnt : reducer;
+
+        for (Map.Entry<UUID, int[]> entry : reducers.entrySet()) {
+            for (int r : entry.getValue()) {
+                if (r == reducer)
+                    return entry.getKey();
+            }
+        }
+
+        throw new IllegalStateException("Not found reducer index: " + reducer);
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId) {
+        return mappers.get(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public int[] reducers(UUID nodeId) {
+        return reducers.get(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<UUID> mapperNodeIds() {
+        return mappers.keySet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<UUID> reducerNodeIds() {
+        return reducers.keySet();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java
deleted file mode 100644
index 37073d9..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import org.apache.ignite.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-
-import java.util.*;
-
-/**
- * Task to get job counters.
- */
-public class GridHadoopProtocolJobCountersTask extends GridHadoopProtocolTaskAdapter<GridHadoopCounters> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopCounters run(ComputeJobContext jobCtx, GridHadoop hadoop,
-        GridHadoopProtocolTaskArguments args) throws IgniteCheckedException {
-
-        UUID nodeId = UUID.fromString(args.<String>get(0));
-        Integer id = args.get(1);
-
-        assert nodeId != null;
-        assert id != null;
-
-        return hadoop.counters(new GridHadoopJobId(nodeId, id));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java
deleted file mode 100644
index de4f89c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import org.apache.ignite.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.lang.*;
-
-import java.util.*;
-
-/**
- * Job status task.
- */
-public class GridHadoopProtocolJobStatusTask extends GridHadoopProtocolTaskAdapter<GridHadoopJobStatus> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Default poll delay */
-    private static final long DFLT_POLL_DELAY = 100L;
-
-    /** Attribute for held status. */
-    private static final String ATTR_HELD = "held";
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopJobStatus run(final ComputeJobContext jobCtx, GridHadoop hadoop,
-        GridHadoopProtocolTaskArguments args) throws IgniteCheckedException {
-        UUID nodeId = UUID.fromString(args.<String>get(0));
-        Integer id = args.get(1);
-        Long pollDelay = args.get(2);
-
-        assert nodeId != null;
-        assert id != null;
-
-        GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id);
-
-        if (pollDelay == null)
-            pollDelay = DFLT_POLL_DELAY;
-
-        if (pollDelay > 0) {
-            IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId);
-
-            if (fut != null) {
-                if (fut.isDone() || F.eq(jobCtx.getAttribute(ATTR_HELD), true))
-                    return hadoop.status(jobId);
-                else {
-                    fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() {
-                        @Override public void apply(IgniteInternalFuture<?> fut0) {
-                            jobCtx.callcc();
-                        }
-                    });
-
-                    jobCtx.setAttribute(ATTR_HELD, true);
-
-                    return jobCtx.holdcc(pollDelay);
-                }
-            }
-            else
-                return null;
-        }
-        else
-            return hadoop.status(jobId);
-    }
-}


Mime
View raw message