ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [12/51] [partial] ignite git commit: IGNITE-3916: Initial impl.
Date Fri, 16 Sep 2016 11:20:55 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
deleted file mode 100644
index 090b336..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.jobtracker;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_SETUP;
-
-/**
- * Hadoop job metadata. Internal object used for distributed job state tracking.
- */
-public class HadoopJobMetadata implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Job ID. */
-    private HadoopJobId jobId;
-
-    /** Job info. */
-    private HadoopJobInfo jobInfo;
-
-    /** Node submitted job. */
-    private UUID submitNodeId;
-
-    /** Map-reduce plan. */
-    private HadoopMapReducePlan mrPlan;
-
-    /** Pending splits for which mapper should be executed. */
-    private Map<HadoopInputSplit, Integer> pendingSplits;
-
-    /** Pending reducers. */
-    private Collection<Integer> pendingReducers;
-
-    /** Reducers addresses. */
-    @GridToStringInclude
-    private Map<Integer, HadoopProcessDescriptor> reducersAddrs;
-
-    /** Job phase. */
-    private HadoopJobPhase phase = PHASE_SETUP;
-
-    /** Fail cause. */
-    @GridToStringExclude
-    private Throwable failCause;
-
-    /** Version. */
-    private long ver;
-
-    /** Job counters */
-    private HadoopCounters counters = new HadoopCountersImpl();
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public HadoopJobMetadata() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param submitNodeId Submit node ID.
-     * @param jobId Job ID.
-     * @param jobInfo Job info.
-     */
-    public HadoopJobMetadata(UUID submitNodeId, HadoopJobId jobId, HadoopJobInfo jobInfo) {
-        this.jobId = jobId;
-        this.jobInfo = jobInfo;
-        this.submitNodeId = submitNodeId;
-    }
-
-    /**
-     * Copy constructor.
-     *
-     * @param src Metadata to copy.
-     */
-    public HadoopJobMetadata(HadoopJobMetadata src) {
-        // Make sure to preserve alphabetic order.
-        counters = src.counters;
-        failCause = src.failCause;
-        jobId = src.jobId;
-        jobInfo = src.jobInfo;
-        mrPlan = src.mrPlan;
-        pendingSplits = src.pendingSplits;
-        pendingReducers = src.pendingReducers;
-        phase = src.phase;
-        reducersAddrs = src.reducersAddrs;
-        submitNodeId = src.submitNodeId;
-        ver = src.ver + 1;
-    }
-
-    /**
-     * @return Submit node ID.
-     */
-    public UUID submitNodeId() {
-        return submitNodeId;
-    }
-
-    /**
-     * @param phase Job phase.
-     */
-    public void phase(HadoopJobPhase phase) {
-        this.phase = phase;
-    }
-
-    /**
-     * @return Job phase.
-     */
-    public HadoopJobPhase phase() {
-        return phase;
-    }
-
-    /**
-     * Gets reducers addresses for external execution.
-     *
-     * @return Reducers addresses.
-     */
-    public Map<Integer, HadoopProcessDescriptor> reducersAddresses() {
-        return reducersAddrs;
-    }
-
-    /**
-     * Sets reducers addresses for external execution.
-     *
-     * @param reducersAddrs Map of addresses.
-     */
-    public void reducersAddresses(Map<Integer, HadoopProcessDescriptor> reducersAddrs) {
-        this.reducersAddrs = reducersAddrs;
-    }
-
-    /**
-     * Sets collection of pending splits.
-     *
-     * @param pendingSplits Collection of pending splits.
-     */
-    public void pendingSplits(Map<HadoopInputSplit, Integer> pendingSplits) {
-        this.pendingSplits = pendingSplits;
-    }
-
-    /**
-     * Gets collection of pending splits.
-     *
-     * @return Collection of pending splits.
-     */
-    public Map<HadoopInputSplit, Integer> pendingSplits() {
-        return pendingSplits;
-    }
-
-    /**
-     * Sets collection of pending reducers.
-     *
-     * @param pendingReducers Collection of pending reducers.
-     */
-    public void pendingReducers(Collection<Integer> pendingReducers) {
-        this.pendingReducers = pendingReducers;
-    }
-
-    /**
-     * Gets collection of pending reducers.
-     *
-     * @return Collection of pending reducers.
-     */
-    public Collection<Integer> pendingReducers() {
-        return pendingReducers;
-    }
-
-    /**
-     * @return Job ID.
-     */
-    public HadoopJobId jobId() {
-        return jobId;
-    }
-
-    /**
-     * @param mrPlan Map-reduce plan.
-     */
-    public void mapReducePlan(HadoopMapReducePlan mrPlan) {
-        assert this.mrPlan == null : "Map-reduce plan can only be initialized once.";
-
-        this.mrPlan = mrPlan;
-    }
-
-    /**
-     * @return Map-reduce plan.
-     */
-    public HadoopMapReducePlan mapReducePlan() {
-        return mrPlan;
-    }
-
-    /**
-     * @return Job info.
-     */
-    public HadoopJobInfo jobInfo() {
-        return jobInfo;
-    }
-
-    /**
-     * Returns job counters.
-     *
-     * @return Collection of counters.
-     */
-    public HadoopCounters counters() {
-        return counters;
-    }
-
-    /**
-     * Sets counters.
-     *
-     * @param counters Collection of counters.
-     */
-    public void counters(HadoopCounters counters) {
-        this.counters = counters;
-    }
-
-    /**
-     * @param failCause Fail cause.
-     */
-    public void failCause(Throwable failCause) {
-        assert failCause != null;
-
-        if (this.failCause == null) // Keep the first error.
-            this.failCause = failCause;
-    }
-
-    /**
-     * @return Fail cause.
-     */
-    public Throwable failCause() {
-        return failCause;
-    }
-
-    /**
-     * @return Version.
-     */
-    public long version() {
-        return ver;
-    }
-
-    /**
-     * @param split Split.
-     * @return Task number.
-     */
-    public int taskNumber(HadoopInputSplit split) {
-        return pendingSplits.get(split);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeUuid(out, submitNodeId);
-        out.writeObject(jobId);
-        out.writeObject(jobInfo);
-        out.writeObject(mrPlan);
-        out.writeObject(pendingSplits);
-        out.writeObject(pendingReducers);
-        out.writeObject(phase);
-        out.writeObject(failCause);
-        out.writeLong(ver);
-        out.writeObject(reducersAddrs);
-        out.writeObject(counters);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        submitNodeId = U.readUuid(in);
-        jobId = (HadoopJobId)in.readObject();
-        jobInfo = (HadoopJobInfo)in.readObject();
-        mrPlan = (HadoopMapReducePlan)in.readObject();
-        pendingSplits = (Map<HadoopInputSplit,Integer>)in.readObject();
-        pendingReducers = (Collection<Integer>)in.readObject();
-        phase = (HadoopJobPhase)in.readObject();
-        failCause = (Throwable)in.readObject();
-        ver = in.readLong();
-        reducersAddrs = (Map<Integer, HadoopProcessDescriptor>)in.readObject();
-        counters = (HadoopCounters)in.readObject();
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-        return S.toString(HadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(),
-            "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null :
-                failCause.getClass().getName());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
deleted file mode 100644
index f3e17f3..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ /dev/null
@@ -1,1706 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.jobtracker;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryUpdatedListener;
-import javax.cache.expiry.Duration;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.expiry.ModifiedExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.MutableEntry;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
-import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
-import org.apache.ignite.internal.processors.hadoop.HadoopContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskStatus;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
-import org.apache.ignite.internal.util.GridMutex;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CIX1;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.SB;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_CANCELLING;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_COMPLETE;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_MAP;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_REDUCE;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_SETUP;
-import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.ABORT;
-import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.COMMIT;
-import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP;
-import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.REDUCE;
-import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.SETUP;
-import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.COMPLETED;
-import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.CRASHED;
-import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.FAILED;
-import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.RUNNING;
-
-/**
- * Hadoop job tracker.
- */
-public class HadoopJobTracker extends HadoopComponent {
-    /** */
-    private final GridMutex mux = new GridMutex();
-
-    /** */
-    private volatile IgniteInternalCache<HadoopJobId, HadoopJobMetadata> jobMetaPrj;
-
-    /** Projection with expiry policy for finished job updates. */
-    private volatile IgniteInternalCache<HadoopJobId, HadoopJobMetadata> finishedJobMetaPrj;
-
-    /** Map-reduce execution planner. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private HadoopMapReducePlanner mrPlanner;
-
-    /** All the known jobs. */
-    private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJob>> jobs = new ConcurrentHashMap8<>();
-
-    /** Locally active jobs. */
-    private final ConcurrentMap<HadoopJobId, JobLocalState> activeJobs = new ConcurrentHashMap8<>();
-
-    /** Locally requested finish futures. */
-    private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJobId>> activeFinishFuts =
-        new ConcurrentHashMap8<>();
-
-    /** Event processing service. */
-    private ExecutorService evtProcSvc;
-
-    /** Component busy lock. */
-    private GridSpinReadWriteLock busyLock;
-
-    /** Class to create HadoopJob instances from. */
-    private Class<? extends HadoopJob> jobCls;
-
-    /** Closure to check result of async transform of system cache. */
-    private final IgniteInClosure<IgniteInternalFuture<?>> failsLog = new CI1<IgniteInternalFuture<?>>() {
-        @Override public void apply(IgniteInternalFuture<?> gridFut) {
-            try {
-                gridFut.get();
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to transform system cache.", e);
-            }
-        }
-    };
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void start(final HadoopContext ctx) throws IgniteCheckedException {
-        super.start(ctx);
-
-        busyLock = new GridSpinReadWriteLock();
-
-        evtProcSvc = Executors.newFixedThreadPool(1);
-
-        UUID nodeId = ctx.localNodeId();
-
-        assert jobCls == null;
-
-        String[] libNames = null;
-
-        if (ctx.configuration() != null)
-            libNames = ctx.configuration().getNativeLibraryNames();
-
-        HadoopClassLoader ldr = new HadoopClassLoader(null, HadoopClassLoader.nameForJob(nodeId), libNames);
-
-        try {
-            jobCls = (Class<HadoopV2Job>)ldr.loadClass(HadoopV2Job.class.getName());
-        }
-        catch (Exception ioe) {
-            throw new IgniteCheckedException("Failed to load job class [class="
-                + HadoopV2Job.class.getName() + ']', ioe);
-        }
-    }
-
-    /**
-     * @return Job meta projection.
-     */
-    @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-    private IgniteInternalCache<HadoopJobId, HadoopJobMetadata> jobMetaCache() {
-        IgniteInternalCache<HadoopJobId, HadoopJobMetadata> prj = jobMetaPrj;
-
-        if (prj == null) {
-            synchronized (mux) {
-                if ((prj = jobMetaPrj) == null) {
-                    GridCacheAdapter<HadoopJobId, HadoopJobMetadata> sysCache = ctx.kernalContext().cache()
-                        .internalCache(CU.SYS_CACHE_HADOOP_MR);
-
-                    assert sysCache != null;
-
-                    mrPlanner = ctx.planner();
-
-                    try {
-                        ctx.kernalContext().resource().injectGeneric(mrPlanner);
-                    }
-                    catch (IgniteCheckedException e) { // Must not happen.
-                        U.error(log, "Failed to inject resources.", e);
-
-                        throw new IllegalStateException(e);
-                    }
-
-                    jobMetaPrj = prj = sysCache;
-
-                    if (ctx.configuration().getFinishedJobInfoTtl() > 0) {
-                        ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy(
-                            new Duration(MILLISECONDS, ctx.configuration().getFinishedJobInfoTtl()));
-
-                        finishedJobMetaPrj = prj.withExpiryPolicy(finishedJobPlc);
-                    }
-                    else
-                        finishedJobMetaPrj = jobMetaPrj;
-                }
-            }
-        }
-
-        return prj;
-    }
-
-    /**
-     * @return Projection with expiry policy for finished job updates.
-     */
-    private IgniteInternalCache<HadoopJobId, HadoopJobMetadata> finishedJobMetaCache() {
-        IgniteInternalCache<HadoopJobId, HadoopJobMetadata> prj = finishedJobMetaPrj;
-
-        if (prj == null) {
-            jobMetaCache();
-
-            prj = finishedJobMetaPrj;
-
-            assert prj != null;
-        }
-
-        return prj;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("deprecation")
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        super.onKernalStart();
-
-        jobMetaCache().context().continuousQueries().executeInternalQuery(
-            new CacheEntryUpdatedListener<HadoopJobId, HadoopJobMetadata>() {
-                @Override public void onUpdated(final Iterable<CacheEntryEvent<? extends HadoopJobId,
-                    ? extends HadoopJobMetadata>> evts) {
-                    if (!busyLock.tryReadLock())
-                        return;
-
-                    try {
-                        // Must process query callback in a separate thread to avoid deadlocks.
-                        evtProcSvc.submit(new EventHandler() {
-                            @Override protected void body() throws IgniteCheckedException {
-                                processJobMetadataUpdates(evts);
-                            }
-                        });
-                    }
-                    finally {
-                        busyLock.readUnlock();
-                    }
-                }
-            },
-            null,
-            true,
-            true,
-            false
-        );
-
-        ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
-            @Override public void onEvent(final Event evt) {
-                if (!busyLock.tryReadLock())
-                    return;
-
-                try {
-                    // Must process discovery callback in a separate thread to avoid deadlock.
-                    evtProcSvc.submit(new EventHandler() {
-                        @Override protected void body() {
-                            processNodeLeft((DiscoveryEvent)evt);
-                        }
-                    });
-                }
-                finally {
-                    busyLock.readUnlock();
-                }
-            }
-        }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        super.onKernalStop(cancel);
-
-        busyLock.writeLock();
-
-        evtProcSvc.shutdown();
-
-        // Fail all pending futures.
-        for (GridFutureAdapter<HadoopJobId> fut : activeFinishFuts.values())
-            fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job (grid is stopping)."));
-    }
-
-    /**
-     * Submits execution of Hadoop job to grid.
-     *
-     * @param jobId Job ID.
-     * @param info Job info.
-     * @return Job completion future.
-     */
-    @SuppressWarnings("unchecked")
-    public IgniteInternalFuture<HadoopJobId> submit(HadoopJobId jobId, HadoopJobInfo info) {
-        if (!busyLock.tryReadLock()) {
-            return new GridFinishedFuture<>(new IgniteCheckedException("Failed to execute map-reduce job " +
-                "(grid is stopping): " + info));
-        }
-
-        try {
-            long jobPrepare = U.currentTimeMillis();
-
-            if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId))
-                throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId);
-
-            HadoopJob job = job(jobId, info);
-
-            HadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null);
-
-            HadoopJobMetadata meta = new HadoopJobMetadata(ctx.localNodeId(), jobId, info);
-
-            meta.mapReducePlan(mrPlan);
-
-            meta.pendingSplits(allSplits(mrPlan));
-            meta.pendingReducers(allReducers(mrPlan));
-
-            GridFutureAdapter<HadoopJobId> completeFut = new GridFutureAdapter<>();
-
-            GridFutureAdapter<HadoopJobId> old = activeFinishFuts.put(jobId, completeFut);
-
-            assert old == null : "Duplicate completion future [jobId=" + jobId + ", old=" + old + ']';
-
-            if (log.isDebugEnabled())
-                log.debug("Submitting job metadata [jobId=" + jobId + ", meta=" + meta + ']');
-
-            long jobStart = U.currentTimeMillis();
-
-            HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(meta.counters(),
-                ctx.localNodeId());
-
-            perfCntr.clientSubmissionEvents(info);
-            perfCntr.onJobPrepare(jobPrepare);
-            perfCntr.onJobStart(jobStart);
-
-            if (jobMetaCache().getAndPutIfAbsent(jobId, meta) != null)
-                throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId);
-
-            return completeFut;
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to submit job: " + jobId, e);
-
-            return new GridFinishedFuture<>(e);
-        }
-        finally {
-            busyLock.readUnlock();
-        }
-    }
-
-    /**
-     * Convert Hadoop job metadata to job status.
-     *
-     * @param meta Metadata.
-     * @return Status.
-     */
-    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    public static HadoopJobStatus status(HadoopJobMetadata meta) {
-        HadoopJobInfo jobInfo = meta.jobInfo();
-
-        return new HadoopJobStatus(
-            meta.jobId(),
-            jobInfo.jobName(),
-            jobInfo.user(),
-            meta.pendingSplits() != null ? meta.pendingSplits().size() : 0,
-            meta.pendingReducers() != null ? meta.pendingReducers().size() : 0,
-            meta.mapReducePlan().mappers(),
-            meta.mapReducePlan().reducers(),
-            meta.phase(),
-            meta.failCause() != null,
-            meta.version()
-        );
-    }
-
-    /**
-     * Gets hadoop job status for given job ID.
-     *
-     * @param jobId Job ID to get status for.
-     * @return Job status for given job ID or {@code null} if job was not found.
-     */
-    @Nullable public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
-        if (!busyLock.tryReadLock())
-            return null; // Grid is stopping.
-
-        try {
-            HadoopJobMetadata meta = jobMetaCache().get(jobId);
-
-            return meta != null ? status(meta) : null;
-        }
-        finally {
-            busyLock.readUnlock();
-        }
-    }
-
-    /**
-     * Gets job finish future.
-     *
-     * @param jobId Job ID.
-     * @return Finish future or {@code null}.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
-        if (!busyLock.tryReadLock())
-            return null; // Grid is stopping.
-
-        try {
-            HadoopJobMetadata meta = jobMetaCache().get(jobId);
-
-            if (meta == null)
-                return null;
-
-            if (log.isTraceEnabled())
-                log.trace("Got job metadata for status check [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']');
-
-            if (meta.phase() == PHASE_COMPLETE) {
-                if (log.isTraceEnabled())
-                    log.trace("Job is complete, returning finished future: " + jobId);
-
-                return new GridFinishedFuture<>(jobId);
-            }
-
-            GridFutureAdapter<HadoopJobId> fut = F.addIfAbsent(activeFinishFuts, jobId,
-                new GridFutureAdapter<HadoopJobId>());
-
-            // Get meta from cache one more time to close the window.
-            meta = jobMetaCache().get(jobId);
-
-            if (log.isTraceEnabled())
-                log.trace("Re-checking job metadata [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']');
-
-            if (meta == null) {
-                fut.onDone();
-
-                activeFinishFuts.remove(jobId , fut);
-            }
-            else if (meta.phase() == PHASE_COMPLETE) {
-                fut.onDone(jobId, meta.failCause());
-
-                activeFinishFuts.remove(jobId , fut);
-            }
-
-            return fut;
-        }
-        finally {
-            busyLock.readUnlock();
-        }
-    }
-
-    /**
-     * Gets job plan by job ID.
-     *
-     * @param jobId Job ID.
-     * @return Job plan.
-     * @throws IgniteCheckedException If failed.
-     */
-    public HadoopMapReducePlan plan(HadoopJobId jobId) throws IgniteCheckedException {
-        if (!busyLock.tryReadLock())
-            return null;
-
-        try {
-            HadoopJobMetadata meta = jobMetaCache().get(jobId);
-
-            if (meta != null)
-                return meta.mapReducePlan();
-
-            return null;
-        }
-        finally {
-            busyLock.readUnlock();
-        }
-    }
-
-    /**
-     * Callback from task executor invoked when a task has been finished.
-     *
-     * @param info Task info.
-     * @param status Task status.
-     */
-    @SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
-    public void onTaskFinished(HadoopTaskInfo info, HadoopTaskStatus status) {
-        if (!busyLock.tryReadLock())
-            return;
-
-        try {
-            assert status.state() != RUNNING;
-
-            if (log.isDebugEnabled())
-                log.debug("Received task finished callback [info=" + info + ", status=" + status + ']');
-
-            JobLocalState state = activeJobs.get(info.jobId());
-
-            // Task CRASHes with null fail cause.
-            assert (status.state() != FAILED) || status.failCause() != null :
-                "Invalid task status [info=" + info + ", status=" + status + ']';
-
-            assert state != null || (ctx.jobUpdateLeader() && (info.type() == COMMIT || info.type() == ABORT)):
-                "Missing local state for finished task [info=" + info + ", status=" + status + ']';
-
-            StackedProcessor incrCntrs = null;
-
-            if (status.state() == COMPLETED)
-                incrCntrs = new IncrementCountersProcessor(null, status.counters());
-
-            switch (info.type()) {
-                case SETUP: {
-                    state.onSetupFinished(info, status, incrCntrs);
-
-                    break;
-                }
-
-                case MAP: {
-                    state.onMapFinished(info, status, incrCntrs);
-
-                    break;
-                }
-
-                case REDUCE: {
-                    state.onReduceFinished(info, status, incrCntrs);
-
-                    break;
-                }
-
-                case COMBINE: {
-                    state.onCombineFinished(info, status, incrCntrs);
-
-                    break;
-                }
-
-                case COMMIT:
-                case ABORT: {
-                    IgniteInternalCache<HadoopJobId, HadoopJobMetadata> cache = finishedJobMetaCache();
-
-                    cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)).
-                        listen(failsLog);
-
-                    break;
-                }
-            }
-        }
-        finally {
-            busyLock.readUnlock();
-        }
-    }
-
-    /**
-     * @param jobId Job id.
-     * @param c Closure of operation.
-     */
-    private void transform(HadoopJobId jobId, EntryProcessor<HadoopJobId, HadoopJobMetadata, Void> c) {
-        jobMetaCache().invokeAsync(jobId, c).listen(failsLog);
-    }
-
-    /**
-     * Callback from task executor called when process is ready to received shuffle messages.
-     *
-     * @param jobId Job ID.
-     * @param reducers Reducers.
-     * @param desc Process descriptor.
-     */
-    public void onExternalMappersInitialized(HadoopJobId jobId, Collection<Integer> reducers,
-        HadoopProcessDescriptor desc) {
-        transform(jobId, new InitializeReducersProcessor(null, reducers, desc));
-    }
-
-    /**
-     * Gets all input splits for given hadoop map-reduce plan.
-     *
-     * @param plan Map-reduce plan.
-     * @return Collection of all input splits that should be processed.
-     */
-    @SuppressWarnings("ConstantConditions")
-    private Map<HadoopInputSplit, Integer> allSplits(HadoopMapReducePlan plan) {
-        Map<HadoopInputSplit, Integer> res = new HashMap<>();
-
-        int taskNum = 0;
-
-        for (UUID nodeId : plan.mapperNodeIds()) {
-            for (HadoopInputSplit split : plan.mappers(nodeId)) {
-                if (res.put(split, taskNum++) != null)
-                    throw new IllegalStateException("Split duplicate.");
-            }
-        }
-
-        return res;
-    }
-
-    /**
-     * Gets all reducers for this job.
-     *
-     * @param plan Map-reduce plan.
-     * @return Collection of reducers.
-     */
-    private Collection<Integer> allReducers(HadoopMapReducePlan plan) {
-        Collection<Integer> res = new HashSet<>();
-
-        for (int i = 0; i < plan.reducers(); i++)
-            res.add(i);
-
-        return res;
-    }
-
-    /**
-     * Processes node leave (or fail) event.
-     *
-     * @param evt Discovery event.
-     */
-    @SuppressWarnings("ConstantConditions")
-    private void processNodeLeft(DiscoveryEvent evt) {
-        if (log.isDebugEnabled())
-            log.debug("Processing discovery event [locNodeId=" + ctx.localNodeId() + ", evt=" + evt + ']');
-
-        // Check only if this node is responsible for job status updates.
-        if (ctx.jobUpdateLeader()) {
-            boolean checkSetup = evt.eventNode().order() < ctx.localNodeOrder();
-
-            // Iteration over all local entries is correct since system cache is REPLICATED.
-            for (Object metaObj : jobMetaCache().values()) {
-                HadoopJobMetadata meta = (HadoopJobMetadata)metaObj;
-
-                HadoopJobId jobId = meta.jobId();
-
-                HadoopMapReducePlan plan = meta.mapReducePlan();
-
-                HadoopJobPhase phase = meta.phase();
-
-                try {
-                    if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) {
-                        // Failover setup task.
-                        HadoopJob job = job(jobId, meta.jobInfo());
-
-                        Collection<HadoopTaskInfo> setupTask = setupTask(jobId);
-
-                        assert setupTask != null;
-
-                        ctx.taskExecutor().run(job, setupTask);
-                    }
-                    else if (phase == PHASE_MAP || phase == PHASE_REDUCE) {
-                        // Must check all nodes, even that are not event node ID due to
-                        // multiple node failure possibility.
-                        Collection<HadoopInputSplit> cancelSplits = null;
-
-                        for (UUID nodeId : plan.mapperNodeIds()) {
-                            if (ctx.kernalContext().discovery().node(nodeId) == null) {
-                                // Node has left the grid.
-                                Collection<HadoopInputSplit> mappers = plan.mappers(nodeId);
-
-                                if (cancelSplits == null)
-                                    cancelSplits = new HashSet<>();
-
-                                cancelSplits.addAll(mappers);
-                            }
-                        }
-
-                        Collection<Integer> cancelReducers = null;
-
-                        for (UUID nodeId : plan.reducerNodeIds()) {
-                            if (ctx.kernalContext().discovery().node(nodeId) == null) {
-                                // Node has left the grid.
-                                int[] reducers = plan.reducers(nodeId);
-
-                                if (cancelReducers == null)
-                                    cancelReducers = new HashSet<>();
-
-                                for (int rdc : reducers)
-                                    cancelReducers.add(rdc);
-                            }
-                        }
-
-                        if (cancelSplits != null || cancelReducers != null)
-                            jobMetaCache().invoke(meta.jobId(), new CancelJobProcessor(null, new IgniteCheckedException(
-                                "One or more nodes participating in map-reduce job execution failed."), cancelSplits,
-                                cancelReducers));
-                    }
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to cancel job: " + meta, e);
-                }
-            }
-        }
-    }
-
-    /**
-     * @param updated Updated cache entries.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void processJobMetadataUpdates(
-        Iterable<CacheEntryEvent<? extends HadoopJobId, ? extends HadoopJobMetadata>> updated)
-        throws IgniteCheckedException {
-        UUID locNodeId = ctx.localNodeId();
-
-        for (CacheEntryEvent<? extends HadoopJobId, ? extends HadoopJobMetadata> entry : updated) {
-            HadoopJobId jobId = entry.getKey();
-            HadoopJobMetadata meta = entry.getValue();
-
-            if (meta == null || !ctx.isParticipating(meta))
-                continue;
-
-            if (log.isDebugEnabled())
-                log.debug("Processing job metadata update callback [locNodeId=" + locNodeId +
-                    ", meta=" + meta + ']');
-
-            try {
-                ctx.taskExecutor().onJobStateChanged(meta);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to process job state changed callback (will fail the job) " +
-                    "[locNodeId=" + locNodeId + ", jobId=" + jobId + ", meta=" + meta + ']', e);
-
-                transform(jobId, new CancelJobProcessor(null, e));
-
-                continue;
-            }
-
-            processJobMetaUpdate(jobId, meta, locNodeId);
-        }
-    }
-
-    /**
-     * @param jobId  Job ID.
-     * @param plan Map-reduce plan.
-     */
-    @SuppressWarnings({"unused", "ConstantConditions" })
-    private void printPlan(HadoopJobId jobId, HadoopMapReducePlan plan) {
-        log.info("Plan for " + jobId);
-
-        SB b = new SB();
-
-        b.a("   Map: ");
-
-        for (UUID nodeId : plan.mapperNodeIds())
-            b.a(nodeId).a("=").a(plan.mappers(nodeId).size()).a(' ');
-
-        log.info(b.toString());
-
-        b = new SB();
-
-        b.a("   Reduce: ");
-
-        for (UUID nodeId : plan.reducerNodeIds())
-            b.a(nodeId).a("=").a(Arrays.toString(plan.reducers(nodeId))).a(' ');
-
-        log.info(b.toString());
-    }
-
-    /**
-     * @param jobId Job ID.
-     * @param meta Job metadata.
-     * @param locNodeId Local node ID.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void processJobMetaUpdate(HadoopJobId jobId, HadoopJobMetadata meta, UUID locNodeId)
-        throws IgniteCheckedException {
-        JobLocalState state = activeJobs.get(jobId);
-
-        HadoopJob job = job(jobId, meta.jobInfo());
-
-        HadoopMapReducePlan plan = meta.mapReducePlan();
-
-        switch (meta.phase()) {
-            case PHASE_SETUP: {
-                if (ctx.jobUpdateLeader()) {
-                    Collection<HadoopTaskInfo> setupTask = setupTask(jobId);
-
-                    if (setupTask != null)
-                        ctx.taskExecutor().run(job, setupTask);
-                }
-
-                break;
-            }
-
-            case PHASE_MAP: {
-                // Check if we should initiate new task on local node.
-                Collection<HadoopTaskInfo> tasks = mapperTasks(plan.mappers(locNodeId), meta);
-
-                if (tasks != null)
-                    ctx.taskExecutor().run(job, tasks);
-
-                break;
-            }
-
-            case PHASE_REDUCE: {
-                if (meta.pendingReducers().isEmpty() && ctx.jobUpdateLeader()) {
-                    HadoopTaskInfo info = new HadoopTaskInfo(COMMIT, jobId, 0, 0, null);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Submitting COMMIT task for execution [locNodeId=" + locNodeId +
-                                ", jobId=" + jobId + ']');
-
-                    ctx.taskExecutor().run(job, Collections.singletonList(info));
-
-                    break;
-                }
-
-                Collection<HadoopTaskInfo> tasks = reducerTasks(plan.reducers(locNodeId), job);
-
-                if (tasks != null)
-                    ctx.taskExecutor().run(job, tasks);
-
-                break;
-            }
-
-            case PHASE_CANCELLING: {
-                // Prevent multiple task executor notification.
-                if (state != null && state.onCancel()) {
-                    if (log.isDebugEnabled())
-                        log.debug("Cancelling local task execution for job: " + meta);
-
-                    ctx.taskExecutor().cancelTasks(jobId);
-                }
-
-                if (meta.pendingSplits().isEmpty() && meta.pendingReducers().isEmpty()) {
-                    if (ctx.jobUpdateLeader()) {
-                        if (state == null)
-                            state = initState(jobId);
-
-                        // Prevent running multiple abort tasks.
-                        if (state.onAborted()) {
-                            HadoopTaskInfo info = new HadoopTaskInfo(ABORT, jobId, 0, 0, null);
-
-                            if (log.isDebugEnabled())
-                                log.debug("Submitting ABORT task for execution [locNodeId=" + locNodeId +
-                                        ", jobId=" + jobId + ']');
-
-                            ctx.taskExecutor().run(job, Collections.singletonList(info));
-                        }
-                    }
-
-                    break;
-                }
-                else {
-                    // Check if there are unscheduled mappers or reducers.
-                    Collection<HadoopInputSplit> cancelMappers = new ArrayList<>();
-                    Collection<Integer> cancelReducers = new ArrayList<>();
-
-                    Collection<HadoopInputSplit> mappers = plan.mappers(ctx.localNodeId());
-
-                    if (mappers != null) {
-                        for (HadoopInputSplit b : mappers) {
-                            if (state == null || !state.mapperScheduled(b))
-                                cancelMappers.add(b);
-                        }
-                    }
-
-                    int[] rdc = plan.reducers(ctx.localNodeId());
-
-                    if (rdc != null) {
-                        for (int r : rdc) {
-                            if (state == null || !state.reducerScheduled(r))
-                                cancelReducers.add(r);
-                        }
-                    }
-
-                    if (!cancelMappers.isEmpty() || !cancelReducers.isEmpty())
-                        transform(jobId, new CancelJobProcessor(null, cancelMappers, cancelReducers));
-                }
-
-                break;
-            }
-
-            case PHASE_COMPLETE: {
-                if (log.isDebugEnabled())
-                    log.debug("Job execution is complete, will remove local state from active jobs " +
-                        "[jobId=" + jobId + ", meta=" + meta + ']');
-
-                if (state != null) {
-                    state = activeJobs.remove(jobId);
-
-                    assert state != null;
-
-                    ctx.shuffle().jobFinished(jobId);
-                }
-
-                GridFutureAdapter<HadoopJobId> finishFut = activeFinishFuts.remove(jobId);
-
-                if (finishFut != null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Completing job future [locNodeId=" + locNodeId + ", meta=" + meta + ']');
-
-                    finishFut.onDone(jobId, meta.failCause());
-                }
-
-                assert job != null;
-
-                if (ctx.jobUpdateLeader())
-                    job.cleanupStagingDirectory();
-
-                jobs.remove(jobId);
-
-                if (ctx.jobUpdateLeader()) {
-                    ClassLoader ldr = job.getClass().getClassLoader();
-
-                    try {
-                        String statWriterClsName = job.info().property(HadoopUtils.JOB_COUNTER_WRITER_PROPERTY);
-
-                        if (statWriterClsName != null) {
-                            Class<?> cls = ldr.loadClass(statWriterClsName);
-
-                            HadoopCounterWriter writer = (HadoopCounterWriter)cls.newInstance();
-
-                            HadoopCounters cntrs = meta.counters();
-
-                            writer.write(job, cntrs);
-                        }
-                    }
-                    catch (Exception e) {
-                        log.error("Can't write statistic due to: ", e);
-                    }
-                }
-
-                job.dispose(false);
-
-                break;
-            }
-
-            default:
-                throw new IllegalStateException("Unknown phase: " + meta.phase());
-        }
-    }
-
-    /**
-     * Creates setup task based on job information.
-     *
-     * @param jobId Job ID.
-     * @return Setup task wrapped in collection.
-     */
-    @Nullable private Collection<HadoopTaskInfo> setupTask(HadoopJobId jobId) {
-        if (activeJobs.containsKey(jobId))
-            return null;
-        else {
-            initState(jobId);
-
-            return Collections.singleton(new HadoopTaskInfo(SETUP, jobId, 0, 0, null));
-        }
-    }
-
-    /**
-     * Creates mapper tasks based on job information.
-     *
-     * @param mappers Mapper blocks.
-     * @param meta Job metadata.
-     * @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node.
-     */
-    private Collection<HadoopTaskInfo> mapperTasks(Iterable<HadoopInputSplit> mappers, HadoopJobMetadata meta) {
-        UUID locNodeId = ctx.localNodeId();
-        HadoopJobId jobId = meta.jobId();
-
-        JobLocalState state = activeJobs.get(jobId);
-
-        Collection<HadoopTaskInfo> tasks = null;
-
-        if (mappers != null) {
-            if (state == null)
-                state = initState(jobId);
-
-            for (HadoopInputSplit split : mappers) {
-                if (state.addMapper(split)) {
-                    if (log.isDebugEnabled())
-                        log.debug("Submitting MAP task for execution [locNodeId=" + locNodeId +
-                            ", split=" + split + ']');
-
-                    HadoopTaskInfo taskInfo = new HadoopTaskInfo(MAP, jobId, meta.taskNumber(split), 0, split);
-
-                    if (tasks == null)
-                        tasks = new ArrayList<>();
-
-                    tasks.add(taskInfo);
-                }
-            }
-        }
-
-        return tasks;
-    }
-
-    /**
-     * Creates reducer tasks based on job information.
-     *
-     * @param reducers Reducers (may be {@code null}).
-     * @param job Job instance.
-     * @return Collection of task infos.
-     */
-    private Collection<HadoopTaskInfo> reducerTasks(int[] reducers, HadoopJob job) {
-        UUID locNodeId = ctx.localNodeId();
-        HadoopJobId jobId = job.id();
-
-        JobLocalState state = activeJobs.get(jobId);
-
-        Collection<HadoopTaskInfo> tasks = null;
-
-        if (reducers != null) {
-            if (state == null)
-                state = initState(job.id());
-
-            for (int rdc : reducers) {
-                if (state.addReducer(rdc)) {
-                    if (log.isDebugEnabled())
-                        log.debug("Submitting REDUCE task for execution [locNodeId=" + locNodeId +
-                            ", rdc=" + rdc + ']');
-
-                    HadoopTaskInfo taskInfo = new HadoopTaskInfo(REDUCE, jobId, rdc, 0, null);
-
-                    if (tasks == null)
-                        tasks = new ArrayList<>();
-
-                    tasks.add(taskInfo);
-                }
-            }
-        }
-
-        return tasks;
-    }
-
-    /**
-     * Initializes local state for given job metadata.
-     *
-     * @param jobId Job ID.
-     * @return Local state.
-     */
-    private JobLocalState initState(HadoopJobId jobId) {
-        return F.addIfAbsent(activeJobs, jobId, new JobLocalState());
-    }
-
-    /**
-     * Gets or creates job instance.
-     *
-     * @param jobId Job ID.
-     * @param jobInfo Job info.
-     * @return Job.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable public HadoopJob job(HadoopJobId jobId, @Nullable HadoopJobInfo jobInfo) throws IgniteCheckedException {
-        GridFutureAdapter<HadoopJob> fut = jobs.get(jobId);
-
-        if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapter<HadoopJob>())) != null)
-            return fut.get();
-
-        fut = jobs.get(jobId);
-
-        HadoopJob job = null;
-
-        try {
-            if (jobInfo == null) {
-                HadoopJobMetadata meta = jobMetaCache().get(jobId);
-
-                if (meta == null)
-                    throw new IgniteCheckedException("Failed to find job metadata for ID: " + jobId);
-
-                jobInfo = meta.jobInfo();
-            }
-
-            job = jobInfo.createJob(jobCls, jobId, log, ctx.configuration().getNativeLibraryNames());
-
-            job.initialize(false, ctx.localNodeId());
-
-            fut.onDone(job);
-
-            return job;
-        }
-        catch (IgniteCheckedException e) {
-            fut.onDone(e);
-
-            jobs.remove(jobId, fut);
-
-            if (job != null) {
-                try {
-                    job.dispose(false);
-                }
-                catch (IgniteCheckedException e0) {
-                    U.error(log, "Failed to dispose job: " + jobId, e0);
-                }
-            }
-
-            throw e;
-        }
-    }
-
-    /**
-     * Kills job.
-     *
-     * @param jobId Job ID.
-     * @return {@code True} if job was killed.
-     * @throws IgniteCheckedException If failed.
-     */
-    public boolean killJob(HadoopJobId jobId) throws IgniteCheckedException {
-        if (!busyLock.tryReadLock())
-            return false; // Grid is stopping.
-
-        try {
-            HadoopJobMetadata meta = jobMetaCache().get(jobId);
-
-            if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) {
-                HadoopTaskCancelledException err = new HadoopTaskCancelledException("Job cancelled.");
-
-                jobMetaCache().invoke(jobId, new CancelJobProcessor(null, err));
-            }
-        }
-        finally {
-            busyLock.readUnlock();
-        }
-
-        IgniteInternalFuture<?> fut = finishFuture(jobId);
-
-        if (fut != null) {
-            try {
-                fut.get();
-            }
-            catch (Exception e) {
-                if (e.getCause() instanceof HadoopTaskCancelledException)
-                    return true;
-            }
-        }
-
-        return false;
-    }
-
-    /**
-     * Returns job counters.
-     *
-     * @param jobId Job identifier.
-     * @return Job counters or {@code null} if job cannot be found.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable public HadoopCounters jobCounters(HadoopJobId jobId) throws IgniteCheckedException {
-        if (!busyLock.tryReadLock())
-            return null;
-
-        try {
-            final HadoopJobMetadata meta = jobMetaCache().get(jobId);
-
-            return meta != null ? meta.counters() : null;
-        }
-        finally {
-            busyLock.readUnlock();
-        }
-    }
-
-    /**
-     * Event handler protected by busy lock.
-     */
-    private abstract class EventHandler implements Runnable {
-        /** {@inheritDoc} */
-        @Override public void run() {
-            if (!busyLock.tryReadLock())
-                return;
-
-            try {
-                body();
-            }
-            catch (Throwable e) {
-                U.error(log, "Unhandled exception while processing event.", e);
-
-                if (e instanceof Error)
-                    throw (Error)e;
-            }
-            finally {
-                busyLock.readUnlock();
-            }
-        }
-
-        /**
-         * Handler body.
-         */
-        protected abstract void body() throws Exception;
-    }
-
-    /**
-     *
-     */
-    private class JobLocalState {
-        /** Mappers. */
-        private final Collection<HadoopInputSplit> currMappers = new HashSet<>();
-
-        /** Reducers. */
-        private final Collection<Integer> currReducers = new HashSet<>();
-
-        /** Number of completed mappers. */
-        private final AtomicInteger completedMappersCnt = new AtomicInteger();
-
-        /** Cancelled flag. */
-        private boolean cancelled;
-
-        /** Aborted flag. */
-        private boolean aborted;
-
-        /**
-         * @param mapSplit Map split to add.
-         * @return {@code True} if mapper was added.
-         */
-        private boolean addMapper(HadoopInputSplit mapSplit) {
-            return currMappers.add(mapSplit);
-        }
-
-        /**
-         * @param rdc Reducer number to add.
-         * @return {@code True} if reducer was added.
-         */
-        private boolean addReducer(int rdc) {
-            return currReducers.add(rdc);
-        }
-
-        /**
-         * Checks whether this split was scheduled for given attempt.
-         *
-         * @param mapSplit Map split to check.
-         * @return {@code True} if mapper was scheduled.
-         */
-        public boolean mapperScheduled(HadoopInputSplit mapSplit) {
-            return currMappers.contains(mapSplit);
-        }
-
-        /**
-         * Checks whether this split was scheduled for given attempt.
-         *
-         * @param rdc Reducer number to check.
-         * @return {@code True} if reducer was scheduled.
-         */
-        public boolean reducerScheduled(int rdc) {
-            return currReducers.contains(rdc);
-        }
-
-        /**
-         * @param taskInfo Task info.
-         * @param status Task status.
-         * @param prev Previous closure.
-         */
-        private void onSetupFinished(final HadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) {
-            final HadoopJobId jobId = taskInfo.jobId();
-
-            if (status.state() == FAILED || status.state() == CRASHED)
-                transform(jobId, new CancelJobProcessor(prev, status.failCause()));
-            else
-                transform(jobId, new UpdatePhaseProcessor(prev, PHASE_MAP));
-        }
-
-        /**
-         * @param taskInfo Task info.
-         * @param status Task status.
-         * @param prev Previous closure.
-         */
-        private void onMapFinished(final HadoopTaskInfo taskInfo, HadoopTaskStatus status,
-            final StackedProcessor prev) {
-            final HadoopJobId jobId = taskInfo.jobId();
-
-            boolean lastMapperFinished = completedMappersCnt.incrementAndGet() == currMappers.size();
-
-            if (status.state() == FAILED || status.state() == CRASHED) {
-                // Fail the whole job.
-                transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), status.failCause()));
-
-                return;
-            }
-
-            IgniteInClosure<IgniteInternalFuture<?>> cacheUpdater = new CIX1<IgniteInternalFuture<?>>() {
-                @Override public void applyx(IgniteInternalFuture<?> f) {
-                    Throwable err = null;
-
-                    if (f != null) {
-                        try {
-                            f.get();
-                        }
-                        catch (IgniteCheckedException e) {
-                            err = e;
-                        }
-                    }
-
-                    transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), err));
-                }
-            };
-
-            if (lastMapperFinished)
-                ctx.shuffle().flush(jobId).listen(cacheUpdater);
-            else
-                cacheUpdater.apply(null);
-        }
-
-        /**
-         * @param taskInfo Task info.
-         * @param status Task status.
-         * @param prev Previous closure.
-         */
-        private void onReduceFinished(HadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) {
-            HadoopJobId jobId = taskInfo.jobId();
-            if (status.state() == FAILED || status.state() == CRASHED)
-                // Fail the whole job.
-                transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber(), status.failCause()));
-            else
-                transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber()));
-        }
-
-        /**
-         * @param taskInfo Task info.
-         * @param status Task status.
-         * @param prev Previous closure.
-         */
-        private void onCombineFinished(HadoopTaskInfo taskInfo, HadoopTaskStatus status,
-            final StackedProcessor prev) {
-            final HadoopJobId jobId = taskInfo.jobId();
-
-            if (status.state() == FAILED || status.state() == CRASHED)
-                // Fail the whole job.
-                transform(jobId, new RemoveMappersProcessor(prev, currMappers, status.failCause()));
-            else {
-                ctx.shuffle().flush(jobId).listen(new CIX1<IgniteInternalFuture<?>>() {
-                    @Override public void applyx(IgniteInternalFuture<?> f) {
-                        Throwable err = null;
-
-                        if (f != null) {
-                            try {
-                                f.get();
-                            }
-                            catch (IgniteCheckedException e) {
-                                err = e;
-                            }
-                        }
-
-                        transform(jobId, new RemoveMappersProcessor(prev, currMappers, err));
-                    }
-                });
-            }
-        }
-
-        /**
-         * @return {@code True} if job was cancelled by this (first) call.
-         */
-        public boolean onCancel() {
-            if (!cancelled && !aborted) {
-                cancelled = true;
-
-                return true;
-            }
-
-            return false;
-        }
-
-        /**
-         * @return {@code True} if job was aborted this (first) call.
-         */
-        public boolean onAborted() {
-            if (!aborted) {
-                aborted = true;
-
-                return true;
-            }
-
-            return false;
-        }
-    }
-
-    /**
-     * Update job phase transform closure.
-     */
-    private static class UpdatePhaseProcessor extends StackedProcessor {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Phase to update. */
-        private final HadoopJobPhase phase;
-
-        /**
-         * @param prev Previous closure.
-         * @param phase Phase to update.
-         */
-        private UpdatePhaseProcessor(@Nullable StackedProcessor prev, HadoopJobPhase phase) {
-            super(prev);
-
-            this.phase = phase;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
-            cp.phase(phase);
-        }
-    }
-
-    /**
-     * Remove mapper transform closure.
-     */
-    private static class RemoveMappersProcessor extends StackedProcessor {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Mapper split to remove. */
-        private final Collection<HadoopInputSplit> splits;
-
-        /** Error. */
-        private final Throwable err;
-
-        /**
-         * @param prev Previous closure.
-         * @param split Mapper split to remove.
-         * @param err Error.
-         */
-        private RemoveMappersProcessor(@Nullable StackedProcessor prev, HadoopInputSplit split, Throwable err) {
-            this(prev, Collections.singletonList(split), err);
-        }
-
-        /**
-         * @param prev Previous closure.
-         * @param splits Mapper splits to remove.
-         * @param err Error.
-         */
-        private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<HadoopInputSplit> splits,
-            Throwable err) {
-            super(prev);
-
-            this.splits = splits;
-            this.err = err;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
-            Map<HadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
-
-            for (HadoopInputSplit s : splits)
-                splitsCp.remove(s);
-
-            cp.pendingSplits(splitsCp);
-
-            if (cp.phase() != PHASE_CANCELLING && err != null)
-                cp.failCause(err);
-
-            if (err != null)
-                cp.phase(PHASE_CANCELLING);
-
-            if (splitsCp.isEmpty()) {
-                if (cp.phase() != PHASE_CANCELLING)
-                    cp.phase(PHASE_REDUCE);
-            }
-        }
-    }
-
-    /**
-     * Remove reducer transform closure.
-     */
-    private static class RemoveReducerProcessor extends StackedProcessor {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Mapper split to remove. */
-        private final int rdc;
-
-        /** Error. */
-        private Throwable err;
-
-        /**
-         * @param prev Previous closure.
-         * @param rdc Reducer to remove.
-         */
-        private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc) {
-            super(prev);
-
-            this.rdc = rdc;
-        }
-
-        /**
-         * @param prev Previous closure.
-         * @param rdc Reducer to remove.
-         * @param err Error.
-         */
-        private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc, Throwable err) {
-            super(prev);
-
-            this.rdc = rdc;
-            this.err = err;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
-            Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers());
-
-            rdcCp.remove(rdc);
-
-            cp.pendingReducers(rdcCp);
-
-            if (err != null) {
-                cp.phase(PHASE_CANCELLING);
-                cp.failCause(err);
-            }
-        }
-    }
-
-    /**
-     * Initialize reducers.
-     */
-    private static class InitializeReducersProcessor extends StackedProcessor {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Reducers. */
-        private final Collection<Integer> rdc;
-
-        /** Process descriptor for reducers. */
-        private final HadoopProcessDescriptor desc;
-
-        /**
-         * @param prev Previous closure.
-         * @param rdc Reducers to initialize.
-         * @param desc External process descriptor.
-         */
-        private InitializeReducersProcessor(@Nullable StackedProcessor prev,
-            Collection<Integer> rdc,
-            HadoopProcessDescriptor desc) {
-            super(prev);
-
-            assert !F.isEmpty(rdc);
-            assert desc != null;
-
-            this.rdc = rdc;
-            this.desc = desc;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
-            Map<Integer, HadoopProcessDescriptor> oldMap = meta.reducersAddresses();
-
-            Map<Integer, HadoopProcessDescriptor> rdcMap = oldMap == null ?
-                new HashMap<Integer, HadoopProcessDescriptor>() : new HashMap<>(oldMap);
-
-            for (Integer r : rdc)
-                rdcMap.put(r, desc);
-
-            cp.reducersAddresses(rdcMap);
-        }
-    }
-
-    /**
-     * Remove reducer transform closure.
-     */
-    private static class CancelJobProcessor extends StackedProcessor {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Mapper split to remove. */
-        private final Collection<HadoopInputSplit> splits;
-
-        /** Reducers to remove. */
-        private final Collection<Integer> rdc;
-
-        /** Error. */
-        private final Throwable err;
-
-        /**
-         * @param prev Previous closure.
-         * @param err Fail cause.
-         */
-        private CancelJobProcessor(@Nullable StackedProcessor prev, Throwable err) {
-            this(prev, err, null, null);
-        }
-
-        /**
-         * @param prev Previous closure.
-         * @param splits Splits to remove.
-         * @param rdc Reducers to remove.
-         */
-        private CancelJobProcessor(@Nullable StackedProcessor prev,
-            Collection<HadoopInputSplit> splits,
-            Collection<Integer> rdc) {
-            this(prev, null, splits, rdc);
-        }
-
-        /**
-         * @param prev Previous closure.
-         * @param err Error.
-         * @param splits Splits to remove.
-         * @param rdc Reducers to remove.
-         */
-        private CancelJobProcessor(@Nullable StackedProcessor prev,
-            Throwable err,
-            Collection<HadoopInputSplit> splits,
-            Collection<Integer> rdc) {
-            super(prev);
-
-            this.splits = splits;
-            this.rdc = rdc;
-            this.err = err;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
-            final HadoopJobPhase currPhase = meta.phase();
-
-            assert currPhase == PHASE_CANCELLING || currPhase == PHASE_COMPLETE
-                    || err != null: "Invalid phase for cancel: " + currPhase;
-
-            Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers());
-
-            if (rdc != null)
-                rdcCp.removeAll(rdc);
-
-            cp.pendingReducers(rdcCp);
-
-            Map<HadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
-
-            if (splits != null) {
-                for (HadoopInputSplit s : splits)
-                    splitsCp.remove(s);
-            }
-
-            cp.pendingSplits(splitsCp);
-
-            if (currPhase != PHASE_COMPLETE && currPhase != PHASE_CANCELLING)
-                cp.phase(PHASE_CANCELLING);
-
-            if (err != null)
-                cp.failCause(err);
-        }
-    }
-
-    /**
-     * Increment counter values closure.
-     */
-    private static class IncrementCountersProcessor extends StackedProcessor {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private final HadoopCounters counters;
-
-        /**
-         * @param prev Previous closure.
-         * @param counters Task counters to add into job counters.
-         */
-        private IncrementCountersProcessor(@Nullable StackedProcessor prev, HadoopCounters counters) {
-            super(prev);
-
-            assert counters != null;
-
-            this.counters = counters;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
-            HadoopCounters cntrs = new HadoopCountersImpl(cp.counters());
-
-            cntrs.merge(counters);
-
-            cp.counters(cntrs);
-        }
-    }
-
-    /**
-     * Abstract stacked closure.
-     */
-    private abstract static class StackedProcessor implements
-        EntryProcessor<HadoopJobId, HadoopJobMetadata, Void>, Serializable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private final StackedProcessor prev;
-
-        /**
-         * @param prev Previous closure.
-         */
-        private StackedProcessor(@Nullable StackedProcessor prev) {
-            this.prev = prev;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Void process(MutableEntry<HadoopJobId, HadoopJobMetadata> e, Object... args) {
-            HadoopJobMetadata val = apply(e.getValue());
-
-            if (val != null)
-                e.setValue(val);
-            else
-                e.remove();
-
-            return null;
-        }
-
-        /**
-         * @param meta Old value.
-         * @return New value.
-         */
-        private HadoopJobMetadata apply(HadoopJobMetadata meta) {
-            if (meta == null)
-                return null;
-
-            HadoopJobMetadata cp = prev != null ? prev.apply(meta) : new HadoopJobMetadata(meta);
-
-            update(meta, cp);
-
-            return cp;
-        }
-
-        /**
-         * Update given job metadata object.
-         *
-         * @param meta Initial job metadata.
-         * @param cp Copy.
-         */
-        protected abstract void update(HadoopJobMetadata meta, HadoopJobMetadata cp);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
deleted file mode 100644
index 0d7bd3a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.message;
-
-import java.io.Externalizable;
-
-/**
- * Marker interface for all hadoop messages.
- */
-public interface HadoopMessage extends Externalizable {
-    // No-op.
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
deleted file mode 100644
index f01f72b..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
-
-/**
- * Base class for map-reduce planners.
- */
-public abstract class HadoopAbstractMapReducePlanner implements HadoopMapReducePlanner {
-    /** Injected grid. */
-    @IgniteInstanceResource
-    protected Ignite ignite;
-
-    /** Logger. */
-    @SuppressWarnings("UnusedDeclaration")
-    @LoggerResource
-    protected IgniteLogger log;
-
-    /**
-     * Create plan topology.
-     *
-     * @param nodes Topology nodes.
-     * @return Plan topology.
-     */
-    protected static HadoopMapReducePlanTopology topology(Collection<ClusterNode> nodes) {
-        Map<String, HadoopMapReducePlanGroup> macsMap = new HashMap<>(nodes.size());
-
-        Map<UUID, HadoopMapReducePlanGroup> idToGrp = new HashMap<>(nodes.size());
-        Map<String, HadoopMapReducePlanGroup> hostToGrp = new HashMap<>(nodes.size());
-
-        for (ClusterNode node : nodes) {
-            String macs = node.attribute(ATTR_MACS);
-
-            HadoopMapReducePlanGroup grp = macsMap.get(macs);
-
-            if (grp == null) {
-                grp = new HadoopMapReducePlanGroup(node, macs);
-
-                macsMap.put(macs, grp);
-            }
-            else
-                grp.add(node);
-
-            idToGrp.put(node.id(), grp);
-
-            for (String host : node.addresses()) {
-                HadoopMapReducePlanGroup hostGrp = hostToGrp.get(host);
-
-                if (hostGrp == null)
-                    hostToGrp.put(host, grp);
-                else
-                    assert hostGrp == grp;
-            }
-        }
-
-        return new HadoopMapReducePlanTopology(new ArrayList<>(macsMap.values()), idToGrp, hostToGrp);
-    }
-
-
-    /**
-     * Groups nodes by host names.
-     *
-     * @param top Topology to group.
-     * @return Map.
-     */
-    protected static Map<String, Collection<UUID>> groupByHost(Collection<ClusterNode> top) {
-        Map<String, Collection<UUID>> grouped = U.newHashMap(top.size());
-
-        for (ClusterNode node : top) {
-            for (String host : node.hostNames()) {
-                Collection<UUID> nodeIds = grouped.get(host);
-
-                if (nodeIds == null) {
-                    // Expecting 1-2 nodes per host.
-                    nodeIds = new ArrayList<>(2);
-
-                    grouped.put(host, nodeIds);
-                }
-
-                nodeIds.add(node.id());
-            }
-        }
-
-        return grouped;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
deleted file mode 100644
index 15c62c8..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Map-reduce plan.
- */
-public class HadoopDefaultMapReducePlan implements HadoopMapReducePlan {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Mappers map. */
-    private Map<UUID, Collection<HadoopInputSplit>> mappers;
-
-    /** Reducers map. */
-    private Map<UUID, int[]> reducers;
-
-    /** Mappers count. */
-    private int mappersCnt;
-
-    /** Reducers count. */
-    private int reducersCnt;
-
-    /**
-     * @param mappers Mappers map.
-     * @param reducers Reducers map.
-     */
-    public HadoopDefaultMapReducePlan(Map<UUID, Collection<HadoopInputSplit>> mappers,
-        Map<UUID, int[]> reducers) {
-        this.mappers = mappers;
-        this.reducers = reducers;
-
-        if (mappers != null) {
-            for (Collection<HadoopInputSplit> splits : mappers.values())
-                mappersCnt += splits.size();
-        }
-
-        if (reducers != null) {
-            for (int[] rdcrs : reducers.values())
-                reducersCnt += rdcrs.length;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public int mappers() {
-        return mappersCnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int reducers() {
-        return reducersCnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID nodeForReducer(int reducer) {
-        assert reducer >= 0 && reducer < reducersCnt : reducer;
-
-        for (Map.Entry<UUID, int[]> entry : reducers.entrySet()) {
-            for (int r : entry.getValue()) {
-                if (r == reducer)
-                    return entry.getKey();
-            }
-        }
-
-        throw new IllegalStateException("Not found reducer index: " + reducer);
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId) {
-        return mappers.get(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public int[] reducers(UUID nodeId) {
-        return reducers.get(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<UUID> mapperNodeIds() {
-        return mappers.keySet();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<UUID> reducerNodeIds() {
-        return reducers.keySet();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
deleted file mode 100644
index 2fe8682..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.util.ArrayList;
-import java.util.UUID;
-
-/**
- * Map-reduce plan group of nodes on a single physical machine.
- */
-public class HadoopMapReducePlanGroup {
-    /** Node. */
-    private ClusterNode node;
-
-    /** Nodes. */
-    private ArrayList<ClusterNode> nodes;
-
-    /** MAC addresses. */
-    private final String macs;
-
-    /** Weight. */
-    private int weight;
-
-    /**
-     * Constructor.
-     *
-     * @param node First node in the group.
-     * @param macs MAC addresses.
-     */
-    public HadoopMapReducePlanGroup(ClusterNode node, String macs) {
-        assert node != null;
-        assert macs != null;
-
-        this.node = node;
-        this.macs = macs;
-    }
-
-    /**
-     * Add node to the group.
-     *
-     * @param newNode New node.
-     */
-    public void add(ClusterNode newNode) {
-        if (node != null) {
-            nodes = new ArrayList<>(2);
-
-            nodes.add(node);
-
-            node = null;
-        }
-
-        nodes.add(newNode);
-    }
-
-    /**
-     * @return MAC addresses.
-     */
-    public String macs() {
-        return macs;
-    }
-
-    /**
-     * @return {@code True} if only sinle node present.
-     */
-    public boolean single() {
-        return nodeCount() == 1;
-    }
-
-    /**
-     * Get node ID by index.
-     *
-     * @param idx Index.
-     * @return Node.
-     */
-    public UUID nodeId(int idx) {
-        ClusterNode res;
-
-        if (node != null) {
-            assert idx == 0;
-
-            res = node;
-        }
-        else {
-            assert nodes != null;
-            assert idx < nodes.size();
-
-            res = nodes.get(idx);
-        }
-
-        assert res != null;
-
-        return res.id();
-    }
-
-    /**
-     * @return Node count.
-     */
-    public int nodeCount() {
-        return node != null ? 1 : nodes.size();
-    }
-
-    /**
-     * @return weight.
-     */
-    public int weight() {
-        return weight;
-    }
-
-    /**
-     * @param weight weight.
-     */
-    public void weight(int weight) {
-        this.weight = weight;
-    }
-
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return macs.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object obj) {
-        return obj instanceof HadoopMapReducePlanGroup && F.eq(macs, ((HadoopMapReducePlanGroup)obj).macs);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopMapReducePlanGroup.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
deleted file mode 100644
index fa5c469..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- * Map-reduce plan topology.
- */
-public class HadoopMapReducePlanTopology {
-    /** All groups. */
-    private final List<HadoopMapReducePlanGroup> grps;
-
-    /** Node ID to group map. */
-    private final Map<UUID, HadoopMapReducePlanGroup> idToGrp;
-
-    /** Host to group map. */
-    private final Map<String, HadoopMapReducePlanGroup> hostToGrp;
-
-    /**
-     * Constructor.
-     *
-     * @param grps All groups.
-     * @param idToGrp ID to group map.
-     * @param hostToGrp Host to group map.
-     */
-    public HadoopMapReducePlanTopology(List<HadoopMapReducePlanGroup> grps,
-        Map<UUID, HadoopMapReducePlanGroup> idToGrp, Map<String, HadoopMapReducePlanGroup> hostToGrp) {
-        assert grps != null;
-        assert idToGrp != null;
-        assert hostToGrp != null;
-
-        this.grps = grps;
-        this.idToGrp = idToGrp;
-        this.hostToGrp = hostToGrp;
-    }
-
-    /**
-     * @return All groups.
-     */
-    public List<HadoopMapReducePlanGroup> groups() {
-        return grps;
-    }
-
-    /**
-     * Get group for node ID.
-     *
-     * @param id Node ID.
-     * @return Group.
-     */
-    public HadoopMapReducePlanGroup groupForId(UUID id) {
-        return idToGrp.get(id);
-    }
-
-    /**
-     * Get group for host.
-     *
-     * @param host Host.
-     * @return Group.
-     */
-    @Nullable public HadoopMapReducePlanGroup groupForHost(String host) {
-        return hostToGrp.get(host);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopMapReducePlanTopology.class, this);
-    }
-}


Mime
View raw message