Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DC02A10699 for ; Thu, 5 Mar 2015 09:07:23 +0000 (UTC) Received: (qmail 55369 invoked by uid 500); 5 Mar 2015 09:07:23 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 55334 invoked by uid 500); 5 Mar 2015 09:07:23 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 55325 invoked by uid 99); 5 Mar 2015 09:07:23 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 09:07:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 05 Mar 2015 09:05:06 +0000 Received: (qmail 42457 invoked by uid 99); 5 Mar 2015 09:05:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 09:05:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8ED9FE107B; Thu, 5 Mar 2015 09:05:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yzhdanov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 05 Mar 2015 09:05:28 -0000 Message-Id: In-Reply-To: <56d156eb01174f0b88f954783fd4b143@git.apache.org> References: <56d156eb01174f0b88f954783fd4b143@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [28/58] [abbrv] incubator-ignite git commit: IGNITE-386: Squashed changes. X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java deleted file mode 100644 index 0beaf32..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java +++ /dev/null @@ -1,1625 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.jobtracker; - -import org.apache.ignite.*; -import org.apache.ignite.events.*; -import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.managers.eventstorage.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import javax.cache.event.*; -import javax.cache.expiry.*; -import javax.cache.processor.*; -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*; -import static org.apache.ignite.internal.processors.hadoop.taskexecutor.GridHadoopTaskState.*; - -/** - * Hadoop job tracker. - */ -public class GridHadoopJobTracker extends GridHadoopComponent { - /** */ - private final GridMutex mux = new GridMutex(); - - /** */ - private volatile GridCacheProjectionEx jobMetaPrj; - - /** Projection with expiry policy for finished job updates. */ - private volatile GridCacheProjectionEx finishedJobMetaPrj; - - /** Map-reduce execution planner. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private GridHadoopMapReducePlanner mrPlanner; - - /** All the known jobs. */ - private final ConcurrentMap> jobs = new ConcurrentHashMap8<>(); - - /** Locally active jobs. */ - private final ConcurrentMap activeJobs = new ConcurrentHashMap8<>(); - - /** Locally requested finish futures. */ - private final ConcurrentMap> activeFinishFuts = - new ConcurrentHashMap8<>(); - - /** Event processing service. */ - private ExecutorService evtProcSvc; - - /** Component busy lock. */ - private GridSpinReadWriteLock busyLock; - - /** Closure to check result of async transform of system cache. */ - private final IgniteInClosure> failsLog = new CI1>() { - @Override public void apply(IgniteInternalFuture gridFut) { - try { - gridFut.get(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to transform system cache.", e); - } - } - }; - - /** {@inheritDoc} */ - @Override public void start(GridHadoopContext ctx) throws IgniteCheckedException { - super.start(ctx); - - busyLock = new GridSpinReadWriteLock(); - - evtProcSvc = Executors.newFixedThreadPool(1); - } - - /** - * @return Job meta projection. - */ - @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") - private GridCacheProjectionEx jobMetaCache() { - GridCacheProjectionEx prj = jobMetaPrj; - - if (prj == null) { - synchronized (mux) { - if ((prj = jobMetaPrj) == null) { - CacheProjection sysCache = ctx.kernalContext().cache() - .cache(CU.SYS_CACHE_HADOOP_MR); - - assert sysCache != null; - - mrPlanner = ctx.planner(); - - try { - ctx.kernalContext().resource().injectGeneric(mrPlanner); - } - catch (IgniteCheckedException e) { // Must not happen. - U.error(log, "Failed to inject resources.", e); - - throw new IllegalStateException(e); - } - - jobMetaPrj = prj = (GridCacheProjectionEx) - sysCache.projection(GridHadoopJobId.class, GridHadoopJobMetadata.class); - - if (ctx.configuration().getFinishedJobInfoTtl() > 0) { - ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy( - new Duration(MILLISECONDS, ctx.configuration().getFinishedJobInfoTtl())); - - finishedJobMetaPrj = prj.withExpiryPolicy(finishedJobPlc); - } - else - finishedJobMetaPrj = jobMetaPrj; - } - } - } - - return prj; - } - - /** - * @return Projection with expiry policy for finished job updates. - */ - private GridCacheProjectionEx finishedJobMetaCache() { - GridCacheProjectionEx prj = finishedJobMetaPrj; - - if (prj == null) { - jobMetaCache(); - - prj = finishedJobMetaPrj; - - assert prj != null; - } - - return prj; - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - jobMetaCache().context().continuousQueries().executeInternalQuery( - new CacheEntryUpdatedListener() { - @Override public void onUpdated(final Iterable> evts) { - if (!busyLock.tryReadLock()) - return; - - try { - // Must process query callback in a separate thread to avoid deadlocks. - evtProcSvc.submit(new EventHandler() { - @Override protected void body() throws IgniteCheckedException { - processJobMetadataUpdates(evts); - } - }); - } - finally { - busyLock.readUnlock(); - } - } - }, - null, - true, - true - ); - - ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { - @Override public void onEvent(final Event evt) { - if (!busyLock.tryReadLock()) - return; - - try { - // Must process discovery callback in a separate thread to avoid deadlock. - evtProcSvc.submit(new EventHandler() { - @Override protected void body() { - processNodeLeft((DiscoveryEvent)evt); - } - }); - } - finally { - busyLock.readUnlock(); - } - } - }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); - - busyLock.writeLock(); - - evtProcSvc.shutdown(); - - // Fail all pending futures. - for (GridFutureAdapter fut : activeFinishFuts.values()) - fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job (grid is stopping).")); - } - - /** - * Submits execution of Hadoop job to grid. - * - * @param jobId Job ID. - * @param info Job info. - * @return Job completion future. - */ - @SuppressWarnings("unchecked") - public IgniteInternalFuture submit(GridHadoopJobId jobId, GridHadoopJobInfo info) { - if (!busyLock.tryReadLock()) { - return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to execute map-reduce job " + - "(grid is stopping): " + info)); - } - - try { - long jobPrepare = U.currentTimeMillis(); - - if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId)) - throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId); - - GridHadoopJob job = job(jobId, info); - - GridHadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null); - - GridHadoopJobMetadata meta = new GridHadoopJobMetadata(ctx.localNodeId(), jobId, info); - - meta.mapReducePlan(mrPlan); - - meta.pendingSplits(allSplits(mrPlan)); - meta.pendingReducers(allReducers(mrPlan)); - - GridFutureAdapter completeFut = new GridFutureAdapter<>(); - - GridFutureAdapter old = activeFinishFuts.put(jobId, completeFut); - - assert old == null : "Duplicate completion future [jobId=" + jobId + ", old=" + old + ']'; - - if (log.isDebugEnabled()) - log.debug("Submitting job metadata [jobId=" + jobId + ", meta=" + meta + ']'); - - long jobStart = U.currentTimeMillis(); - - GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(meta.counters(), - ctx.localNodeId()); - - perfCntr.clientSubmissionEvents(info); - perfCntr.onJobPrepare(jobPrepare); - perfCntr.onJobStart(jobStart); - - if (jobMetaCache().putIfAbsent(jobId, meta) != null) - throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId); - - return completeFut; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to submit job: " + jobId, e); - - return new GridFinishedFutureEx<>(e); - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Convert Hadoop job metadata to job status. - * - * @param meta Metadata. - * @return Status. - */ - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public static GridHadoopJobStatus status(GridHadoopJobMetadata meta) { - GridHadoopJobInfo jobInfo = meta.jobInfo(); - - return new GridHadoopJobStatus( - meta.jobId(), - jobInfo.jobName(), - jobInfo.user(), - meta.pendingSplits() != null ? meta.pendingSplits().size() : 0, - meta.pendingReducers() != null ? meta.pendingReducers().size() : 0, - meta.mapReducePlan().mappers(), - meta.mapReducePlan().reducers(), - meta.phase(), - meta.failCause() != null, - meta.version() - ); - } - - /** - * Gets hadoop job status for given job ID. - * - * @param jobId Job ID to get status for. - * @return Job status for given job ID or {@code null} if job was not found. - */ - @Nullable public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return null; // Grid is stopping. - - try { - GridHadoopJobMetadata meta = jobMetaCache().get(jobId); - - return meta != null ? status(meta) : null; - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Gets job finish future. - * - * @param jobId Job ID. - * @return Finish future or {@code null}. - * @throws IgniteCheckedException If failed. - */ - @Nullable public IgniteInternalFuture finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return null; // Grid is stopping. - - try { - GridHadoopJobMetadata meta = jobMetaCache().get(jobId); - - if (meta == null) - return null; - - if (log.isTraceEnabled()) - log.trace("Got job metadata for status check [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']'); - - if (meta.phase() == PHASE_COMPLETE) { - if (log.isTraceEnabled()) - log.trace("Job is complete, returning finished future: " + jobId); - - return new GridFinishedFutureEx<>(jobId, meta.failCause()); - } - - GridFutureAdapter fut = F.addIfAbsent(activeFinishFuts, jobId, - new GridFutureAdapter()); - - // Get meta from cache one more time to close the window. - meta = jobMetaCache().get(jobId); - - if (log.isTraceEnabled()) - log.trace("Re-checking job metadata [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']'); - - if (meta == null) { - fut.onDone(); - - activeFinishFuts.remove(jobId , fut); - } - else if (meta.phase() == PHASE_COMPLETE) { - fut.onDone(jobId, meta.failCause()); - - activeFinishFuts.remove(jobId , fut); - } - - return fut; - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Gets job plan by job ID. - * - * @param jobId Job ID. - * @return Job plan. - * @throws IgniteCheckedException If failed. - */ - public GridHadoopMapReducePlan plan(GridHadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return null; - - try { - GridHadoopJobMetadata meta = jobMetaCache().get(jobId); - - if (meta != null) - return meta.mapReducePlan(); - - return null; - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Callback from task executor invoked when a task has been finished. - * - * @param info Task info. - * @param status Task status. - */ - @SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) - public void onTaskFinished(GridHadoopTaskInfo info, GridHadoopTaskStatus status) { - if (!busyLock.tryReadLock()) - return; - - try { - assert status.state() != RUNNING; - - if (log.isDebugEnabled()) - log.debug("Received task finished callback [info=" + info + ", status=" + status + ']'); - - JobLocalState state = activeJobs.get(info.jobId()); - - // Task CRASHes with null fail cause. - assert (status.state() != FAILED) || status.failCause() != null : - "Invalid task status [info=" + info + ", status=" + status + ']'; - - assert state != null || (ctx.jobUpdateLeader() && (info.type() == COMMIT || info.type() == ABORT)): - "Missing local state for finished task [info=" + info + ", status=" + status + ']'; - - StackedProcessor incrCntrs = null; - - if (status.state() == COMPLETED) - incrCntrs = new IncrementCountersProcessor(null, status.counters()); - - switch (info.type()) { - case SETUP: { - state.onSetupFinished(info, status, incrCntrs); - - break; - } - - case MAP: { - state.onMapFinished(info, status, incrCntrs); - - break; - } - - case REDUCE: { - state.onReduceFinished(info, status, incrCntrs); - - break; - } - - case COMBINE: { - state.onCombineFinished(info, status, incrCntrs); - - break; - } - - case COMMIT: - case ABORT: { - GridCacheProjectionEx cache = finishedJobMetaCache(); - - cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)). - listenAsync(failsLog); - - break; - } - } - } - finally { - busyLock.readUnlock(); - } - } - - /** - * @param jobId Job id. - * @param c Closure of operation. - */ - private void transform(GridHadoopJobId jobId, EntryProcessor c) { - jobMetaCache().invokeAsync(jobId, c).listenAsync(failsLog); - } - - /** - * Callback from task executor called when process is ready to received shuffle messages. - * - * @param jobId Job ID. - * @param reducers Reducers. - * @param desc Process descriptor. - */ - public void onExternalMappersInitialized(GridHadoopJobId jobId, Collection reducers, - GridHadoopProcessDescriptor desc) { - transform(jobId, new InitializeReducersProcessor(null, reducers, desc)); - } - - /** - * Gets all input splits for given hadoop map-reduce plan. - * - * @param plan Map-reduce plan. - * @return Collection of all input splits that should be processed. - */ - @SuppressWarnings("ConstantConditions") - private Map allSplits(GridHadoopMapReducePlan plan) { - Map res = new HashMap<>(); - - int taskNum = 0; - - for (UUID nodeId : plan.mapperNodeIds()) { - for (GridHadoopInputSplit split : plan.mappers(nodeId)) { - if (res.put(split, taskNum++) != null) - throw new IllegalStateException("Split duplicate."); - } - } - - return res; - } - - /** - * Gets all reducers for this job. - * - * @param plan Map-reduce plan. - * @return Collection of reducers. - */ - private Collection allReducers(GridHadoopMapReducePlan plan) { - Collection res = new HashSet<>(); - - for (int i = 0; i < plan.reducers(); i++) - res.add(i); - - return res; - } - - /** - * Processes node leave (or fail) event. - * - * @param evt Discovery event. - */ - @SuppressWarnings("ConstantConditions") - private void processNodeLeft(DiscoveryEvent evt) { - if (log.isDebugEnabled()) - log.debug("Processing discovery event [locNodeId=" + ctx.localNodeId() + ", evt=" + evt + ']'); - - // Check only if this node is responsible for job status updates. - if (ctx.jobUpdateLeader()) { - boolean checkSetup = evt.eventNode().order() < ctx.localNodeOrder(); - - // Iteration over all local entries is correct since system cache is REPLICATED. - for (Object metaObj : jobMetaCache().values()) { - GridHadoopJobMetadata meta = (GridHadoopJobMetadata)metaObj; - - GridHadoopJobId jobId = meta.jobId(); - - GridHadoopMapReducePlan plan = meta.mapReducePlan(); - - GridHadoopJobPhase phase = meta.phase(); - - try { - if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) { - // Failover setup task. - GridHadoopJob job = job(jobId, meta.jobInfo()); - - Collection setupTask = setupTask(jobId); - - assert setupTask != null; - - ctx.taskExecutor().run(job, setupTask); - } - else if (phase == PHASE_MAP || phase == PHASE_REDUCE) { - // Must check all nodes, even that are not event node ID due to - // multiple node failure possibility. - Collection cancelSplits = null; - - for (UUID nodeId : plan.mapperNodeIds()) { - if (ctx.kernalContext().discovery().node(nodeId) == null) { - // Node has left the grid. - Collection mappers = plan.mappers(nodeId); - - if (cancelSplits == null) - cancelSplits = new HashSet<>(); - - cancelSplits.addAll(mappers); - } - } - - Collection cancelReducers = null; - - for (UUID nodeId : plan.reducerNodeIds()) { - if (ctx.kernalContext().discovery().node(nodeId) == null) { - // Node has left the grid. - int[] reducers = plan.reducers(nodeId); - - if (cancelReducers == null) - cancelReducers = new HashSet<>(); - - for (int rdc : reducers) - cancelReducers.add(rdc); - } - } - - if (cancelSplits != null || cancelReducers != null) - jobMetaCache().invoke(meta.jobId(), new CancelJobProcessor(null, new IgniteCheckedException( - "One or more nodes participating in map-reduce job execution failed."), cancelSplits, - cancelReducers)); - } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to cancel job: " + meta, e); - } - } - } - } - - /** - * @param updated Updated cache entries. - * @throws IgniteCheckedException If failed. - */ - private void processJobMetadataUpdates( - Iterable> updated) - throws IgniteCheckedException { - UUID locNodeId = ctx.localNodeId(); - - for (CacheEntryEvent entry : updated) { - GridHadoopJobId jobId = entry.getKey(); - GridHadoopJobMetadata meta = entry.getValue(); - - if (meta == null || !ctx.isParticipating(meta)) - continue; - - if (log.isDebugEnabled()) - log.debug("Processing job metadata update callback [locNodeId=" + locNodeId + - ", meta=" + meta + ']'); - - try { - ctx.taskExecutor().onJobStateChanged(meta); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process job state changed callback (will fail the job) " + - "[locNodeId=" + locNodeId + ", jobId=" + jobId + ", meta=" + meta + ']', e); - - transform(jobId, new CancelJobProcessor(null, e)); - - continue; - } - - processJobMetaUpdate(jobId, meta, locNodeId); - } - } - - /** - * @param jobId Job ID. - * @param plan Map-reduce plan. - */ - private void printPlan(GridHadoopJobId jobId, GridHadoopMapReducePlan plan) { - log.info("Plan for " + jobId); - - SB b = new SB(); - - b.a(" Map: "); - - for (UUID nodeId : plan.mapperNodeIds()) - b.a(nodeId).a("=").a(plan.mappers(nodeId).size()).a(' '); - - log.info(b.toString()); - - b = new SB(); - - b.a(" Reduce: "); - - for (UUID nodeId : plan.reducerNodeIds()) - b.a(nodeId).a("=").a(Arrays.toString(plan.reducers(nodeId))).a(' '); - - log.info(b.toString()); - } - - /** - * @param jobId Job ID. - * @param meta Job metadata. - * @param locNodeId Local node ID. - * @throws IgniteCheckedException If failed. - */ - private void processJobMetaUpdate(GridHadoopJobId jobId, GridHadoopJobMetadata meta, UUID locNodeId) - throws IgniteCheckedException { - JobLocalState state = activeJobs.get(jobId); - - GridHadoopJob job = job(jobId, meta.jobInfo()); - - GridHadoopMapReducePlan plan = meta.mapReducePlan(); - - switch (meta.phase()) { - case PHASE_SETUP: { - if (ctx.jobUpdateLeader()) { - Collection setupTask = setupTask(jobId); - - if (setupTask != null) - ctx.taskExecutor().run(job, setupTask); - } - - break; - } - - case PHASE_MAP: { - // Check if we should initiate new task on local node. - Collection tasks = mapperTasks(plan.mappers(locNodeId), meta); - - if (tasks != null) - ctx.taskExecutor().run(job, tasks); - - break; - } - - case PHASE_REDUCE: { - if (meta.pendingReducers().isEmpty() && ctx.jobUpdateLeader()) { - GridHadoopTaskInfo info = new GridHadoopTaskInfo(COMMIT, jobId, 0, 0, null); - - if (log.isDebugEnabled()) - log.debug("Submitting COMMIT task for execution [locNodeId=" + locNodeId + - ", jobId=" + jobId + ']'); - - ctx.taskExecutor().run(job, Collections.singletonList(info)); - - break; - } - - Collection tasks = reducerTasks(plan.reducers(locNodeId), job); - - if (tasks != null) - ctx.taskExecutor().run(job, tasks); - - break; - } - - case PHASE_CANCELLING: { - // Prevent multiple task executor notification. - if (state != null && state.onCancel()) { - if (log.isDebugEnabled()) - log.debug("Cancelling local task execution for job: " + meta); - - ctx.taskExecutor().cancelTasks(jobId); - } - - if (meta.pendingSplits().isEmpty() && meta.pendingReducers().isEmpty()) { - if (ctx.jobUpdateLeader()) { - if (state == null) - state = initState(jobId); - - // Prevent running multiple abort tasks. - if (state.onAborted()) { - GridHadoopTaskInfo info = new GridHadoopTaskInfo(ABORT, jobId, 0, 0, null); - - if (log.isDebugEnabled()) - log.debug("Submitting ABORT task for execution [locNodeId=" + locNodeId + - ", jobId=" + jobId + ']'); - - ctx.taskExecutor().run(job, Collections.singletonList(info)); - } - } - - break; - } - else { - // Check if there are unscheduled mappers or reducers. - Collection cancelMappers = new ArrayList<>(); - Collection cancelReducers = new ArrayList<>(); - - Collection mappers = plan.mappers(ctx.localNodeId()); - - if (mappers != null) { - for (GridHadoopInputSplit b : mappers) { - if (state == null || !state.mapperScheduled(b)) - cancelMappers.add(b); - } - } - - int[] rdc = plan.reducers(ctx.localNodeId()); - - if (rdc != null) { - for (int r : rdc) { - if (state == null || !state.reducerScheduled(r)) - cancelReducers.add(r); - } - } - - if (!cancelMappers.isEmpty() || !cancelReducers.isEmpty()) - transform(jobId, new CancelJobProcessor(null, cancelMappers, cancelReducers)); - } - - break; - } - - case PHASE_COMPLETE: { - if (log.isDebugEnabled()) - log.debug("Job execution is complete, will remove local state from active jobs " + - "[jobId=" + jobId + ", meta=" + meta + ']'); - - if (state != null) { - state = activeJobs.remove(jobId); - - assert state != null; - - ctx.shuffle().jobFinished(jobId); - } - - GridFutureAdapter finishFut = activeFinishFuts.remove(jobId); - - if (finishFut != null) { - if (log.isDebugEnabled()) - log.debug("Completing job future [locNodeId=" + locNodeId + ", meta=" + meta + ']'); - - finishFut.onDone(jobId, meta.failCause()); - } - - if (ctx.jobUpdateLeader()) - job.cleanupStagingDirectory(); - - jobs.remove(jobId); - - job.dispose(false); - - if (ctx.jobUpdateLeader()) { - ClassLoader ldr = job.getClass().getClassLoader(); - - try { - String statWriterClsName = job.info().property(GridHadoopUtils.JOB_COUNTER_WRITER_PROPERTY); - - if (statWriterClsName != null) { - Class cls = ldr.loadClass(statWriterClsName); - - GridHadoopCounterWriter writer = (GridHadoopCounterWriter)cls.newInstance(); - - GridHadoopCounters cntrs = meta.counters(); - - writer.write(job.info(), jobId, cntrs); - } - } - catch (Exception e) { - log.error("Can't write statistic due to: ", e); - } - } - - break; - } - - default: - throw new IllegalStateException("Unknown phase: " + meta.phase()); - } - } - - /** - * Creates setup task based on job information. - * - * @param jobId Job ID. - * @return Setup task wrapped in collection. - */ - @Nullable private Collection setupTask(GridHadoopJobId jobId) { - if (activeJobs.containsKey(jobId)) - return null; - else { - initState(jobId); - - return Collections.singleton(new GridHadoopTaskInfo(SETUP, jobId, 0, 0, null)); - } - } - - /** - * Creates mapper tasks based on job information. - * - * @param mappers Mapper blocks. - * @param meta Job metadata. - * @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node. - */ - private Collection mapperTasks(Iterable mappers, GridHadoopJobMetadata meta) { - UUID locNodeId = ctx.localNodeId(); - GridHadoopJobId jobId = meta.jobId(); - - JobLocalState state = activeJobs.get(jobId); - - Collection tasks = null; - - if (mappers != null) { - if (state == null) - state = initState(jobId); - - for (GridHadoopInputSplit split : mappers) { - if (state.addMapper(split)) { - if (log.isDebugEnabled()) - log.debug("Submitting MAP task for execution [locNodeId=" + locNodeId + - ", split=" + split + ']'); - - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(MAP, jobId, meta.taskNumber(split), 0, split); - - if (tasks == null) - tasks = new ArrayList<>(); - - tasks.add(taskInfo); - } - } - } - - return tasks; - } - - /** - * Creates reducer tasks based on job information. - * - * @param reducers Reducers (may be {@code null}). - * @param job Job instance. - * @return Collection of task infos. - */ - private Collection reducerTasks(int[] reducers, GridHadoopJob job) { - UUID locNodeId = ctx.localNodeId(); - GridHadoopJobId jobId = job.id(); - - JobLocalState state = activeJobs.get(jobId); - - Collection tasks = null; - - if (reducers != null) { - if (state == null) - state = initState(job.id()); - - for (int rdc : reducers) { - if (state.addReducer(rdc)) { - if (log.isDebugEnabled()) - log.debug("Submitting REDUCE task for execution [locNodeId=" + locNodeId + - ", rdc=" + rdc + ']'); - - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(REDUCE, jobId, rdc, 0, null); - - if (tasks == null) - tasks = new ArrayList<>(); - - tasks.add(taskInfo); - } - } - } - - return tasks; - } - - /** - * Initializes local state for given job metadata. - * - * @param jobId Job ID. - * @return Local state. - */ - private JobLocalState initState(GridHadoopJobId jobId) { - return F.addIfAbsent(activeJobs, jobId, new JobLocalState()); - } - - /** - * Gets or creates job instance. - * - * @param jobId Job ID. - * @param jobInfo Job info. - * @return Job. - * @throws IgniteCheckedException If failed. - */ - @Nullable public GridHadoopJob job(GridHadoopJobId jobId, @Nullable GridHadoopJobInfo jobInfo) throws IgniteCheckedException { - GridFutureAdapterEx fut = jobs.get(jobId); - - if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapterEx())) != null) - return fut.get(); - - fut = jobs.get(jobId); - - GridHadoopJob job = null; - - try { - if (jobInfo == null) { - GridHadoopJobMetadata meta = jobMetaCache().get(jobId); - - if (meta == null) - throw new IgniteCheckedException("Failed to find job metadata for ID: " + jobId); - - jobInfo = meta.jobInfo(); - } - - job = jobInfo.createJob(jobId, log); - - job.initialize(false, ctx.localNodeId()); - - fut.onDone(job); - - return job; - } - catch (IgniteCheckedException e) { - fut.onDone(e); - - jobs.remove(jobId, fut); - - if (job != null) { - try { - job.dispose(false); - } - catch (IgniteCheckedException e0) { - U.error(log, "Failed to dispose job: " + jobId, e0); - } - } - - throw e; - } - } - - /** - * Kills job. - * - * @param jobId Job ID. - * @return {@code True} if job was killed. - * @throws IgniteCheckedException If failed. - */ - public boolean killJob(GridHadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return false; // Grid is stopping. - - try { - GridHadoopJobMetadata meta = jobMetaCache().get(jobId); - - if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) { - GridHadoopTaskCancelledException err = new GridHadoopTaskCancelledException("Job cancelled."); - - jobMetaCache().invoke(jobId, new CancelJobProcessor(null, err)); - } - } - finally { - busyLock.readUnlock(); - } - - IgniteInternalFuture fut = finishFuture(jobId); - - if (fut != null) { - try { - fut.get(); - } - catch (Throwable e) { - if (e.getCause() instanceof GridHadoopTaskCancelledException) - return true; - } - } - - return false; - } - - /** - * Returns job counters. - * - * @param jobId Job identifier. - * @return Job counters or {@code null} if job cannot be found. - * @throws IgniteCheckedException If failed. - */ - @Nullable public GridHadoopCounters jobCounters(GridHadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return null; - - try { - final GridHadoopJobMetadata meta = jobMetaCache().get(jobId); - - return meta != null ? meta.counters() : null; - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Event handler protected by busy lock. - */ - private abstract class EventHandler implements Runnable { - /** {@inheritDoc} */ - @Override public void run() { - if (!busyLock.tryReadLock()) - return; - - try { - body(); - } - catch (Throwable e) { - U.error(log, "Unhandled exception while processing event.", e); - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Handler body. - */ - protected abstract void body() throws Exception; - } - - /** - * - */ - private class JobLocalState { - /** Mappers. */ - private final Collection currMappers = new HashSet<>(); - - /** Reducers. */ - private final Collection currReducers = new HashSet<>(); - - /** Number of completed mappers. */ - private final AtomicInteger completedMappersCnt = new AtomicInteger(); - - /** Cancelled flag. */ - private boolean cancelled; - - /** Aborted flag. */ - private boolean aborted; - - /** - * @param mapSplit Map split to add. - * @return {@code True} if mapper was added. - */ - private boolean addMapper(GridHadoopInputSplit mapSplit) { - return currMappers.add(mapSplit); - } - - /** - * @param rdc Reducer number to add. - * @return {@code True} if reducer was added. - */ - private boolean addReducer(int rdc) { - return currReducers.add(rdc); - } - - /** - * Checks whether this split was scheduled for given attempt. - * - * @param mapSplit Map split to check. - * @return {@code True} if mapper was scheduled. - */ - public boolean mapperScheduled(GridHadoopInputSplit mapSplit) { - return currMappers.contains(mapSplit); - } - - /** - * Checks whether this split was scheduled for given attempt. - * - * @param rdc Reducer number to check. - * @return {@code True} if reducer was scheduled. - */ - public boolean reducerScheduled(int rdc) { - return currReducers.contains(rdc); - } - - /** - * @param taskInfo Task info. - * @param status Task status. - * @param prev Previous closure. - */ - private void onSetupFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) { - final GridHadoopJobId jobId = taskInfo.jobId(); - - if (status.state() == FAILED || status.state() == CRASHED) - transform(jobId, new CancelJobProcessor(prev, status.failCause())); - else - transform(jobId, new UpdatePhaseProcessor(prev, PHASE_MAP)); - } - - /** - * @param taskInfo Task info. - * @param status Task status. - * @param prev Previous closure. - */ - private void onMapFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, - final StackedProcessor prev) { - final GridHadoopJobId jobId = taskInfo.jobId(); - - boolean lastMapperFinished = completedMappersCnt.incrementAndGet() == currMappers.size(); - - if (status.state() == FAILED || status.state() == CRASHED) { - // Fail the whole job. - transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), status.failCause())); - - return; - } - - IgniteInClosure> cacheUpdater = new CIX1>() { - @Override public void applyx(IgniteInternalFuture f) { - Throwable err = null; - - if (f != null) { - try { - f.get(); - } - catch (IgniteCheckedException e) { - err = e; - } - } - - transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), err)); - } - }; - - if (lastMapperFinished) - ctx.shuffle().flush(jobId).listenAsync(cacheUpdater); - else - cacheUpdater.apply(null); - } - - /** - * @param taskInfo Task info. - * @param status Task status. - * @param prev Previous closure. - */ - private void onReduceFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) { - GridHadoopJobId jobId = taskInfo.jobId(); - if (status.state() == FAILED || status.state() == CRASHED) - // Fail the whole job. - transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber(), status.failCause())); - else - transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber())); - } - - /** - * @param taskInfo Task info. - * @param status Task status. - * @param prev Previous closure. - */ - private void onCombineFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, - final StackedProcessor prev) { - final GridHadoopJobId jobId = taskInfo.jobId(); - - if (status.state() == FAILED || status.state() == CRASHED) - // Fail the whole job. - transform(jobId, new RemoveMappersProcessor(prev, currMappers, status.failCause())); - else { - ctx.shuffle().flush(jobId).listenAsync(new CIX1>() { - @Override public void applyx(IgniteInternalFuture f) { - Throwable err = null; - - if (f != null) { - try { - f.get(); - } - catch (IgniteCheckedException e) { - err = e; - } - } - - transform(jobId, new RemoveMappersProcessor(prev, currMappers, err)); - } - }); - } - } - - /** - * @return {@code True} if job was cancelled by this (first) call. - */ - public boolean onCancel() { - if (!cancelled && !aborted) { - cancelled = true; - - return true; - } - - return false; - } - - /** - * @return {@code True} if job was aborted this (first) call. - */ - public boolean onAborted() { - if (!aborted) { - aborted = true; - - return true; - } - - return false; - } - } - - /** - * Update job phase transform closure. - */ - private static class UpdatePhaseProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Phase to update. */ - private final GridHadoopJobPhase phase; - - /** - * @param prev Previous closure. - * @param phase Phase to update. - */ - private UpdatePhaseProcessor(@Nullable StackedProcessor prev, GridHadoopJobPhase phase) { - super(prev); - - this.phase = phase; - } - - /** {@inheritDoc} */ - @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { - cp.phase(phase); - } - } - - /** - * Remove mapper transform closure. - */ - private static class RemoveMappersProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Mapper split to remove. */ - private final Collection splits; - - /** Error. */ - private final Throwable err; - - /** - * @param prev Previous closure. - * @param split Mapper split to remove. - * @param err Error. - */ - private RemoveMappersProcessor(@Nullable StackedProcessor prev, GridHadoopInputSplit split, Throwable err) { - this(prev, Collections.singletonList(split), err); - } - - /** - * @param prev Previous closure. - * @param splits Mapper splits to remove. - * @param err Error. - */ - private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection splits, - Throwable err) { - super(prev); - - this.splits = splits; - this.err = err; - } - - /** {@inheritDoc} */ - @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { - Map splitsCp = new HashMap<>(cp.pendingSplits()); - - for (GridHadoopInputSplit s : splits) - splitsCp.remove(s); - - cp.pendingSplits(splitsCp); - - if (cp.phase() != PHASE_CANCELLING && err != null) - cp.failCause(err); - - if (err != null) - cp.phase(PHASE_CANCELLING); - - if (splitsCp.isEmpty()) { - if (cp.phase() != PHASE_CANCELLING) - cp.phase(PHASE_REDUCE); - } - } - } - - /** - * Remove reducer transform closure. - */ - private static class RemoveReducerProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Mapper split to remove. */ - private final int rdc; - - /** Error. */ - private Throwable err; - - /** - * @param prev Previous closure. - * @param rdc Reducer to remove. - */ - private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc) { - super(prev); - - this.rdc = rdc; - } - - /** - * @param prev Previous closure. - * @param rdc Reducer to remove. - * @param err Error. - */ - private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc, Throwable err) { - super(prev); - - this.rdc = rdc; - this.err = err; - } - - /** {@inheritDoc} */ - @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { - Collection rdcCp = new HashSet<>(cp.pendingReducers()); - - rdcCp.remove(rdc); - - cp.pendingReducers(rdcCp); - - if (err != null) { - cp.phase(PHASE_CANCELLING); - cp.failCause(err); - } - } - } - - /** - * Initialize reducers. - */ - private static class InitializeReducersProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Reducers. */ - private final Collection rdc; - - /** Process descriptor for reducers. */ - private final GridHadoopProcessDescriptor desc; - - /** - * @param prev Previous closure. - * @param rdc Reducers to initialize. - * @param desc External process descriptor. - */ - private InitializeReducersProcessor(@Nullable StackedProcessor prev, - Collection rdc, - GridHadoopProcessDescriptor desc) { - super(prev); - - assert !F.isEmpty(rdc); - assert desc != null; - - this.rdc = rdc; - this.desc = desc; - } - - /** {@inheritDoc} */ - @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { - Map oldMap = meta.reducersAddresses(); - - Map rdcMap = oldMap == null ? - new HashMap() : new HashMap<>(oldMap); - - for (Integer r : rdc) - rdcMap.put(r, desc); - - cp.reducersAddresses(rdcMap); - } - } - - /** - * Remove reducer transform closure. - */ - private static class CancelJobProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Mapper split to remove. */ - private final Collection splits; - - /** Reducers to remove. */ - private final Collection rdc; - - /** Error. */ - private final Throwable err; - - /** - * @param prev Previous closure. - * @param err Fail cause. - */ - private CancelJobProcessor(@Nullable StackedProcessor prev, Throwable err) { - this(prev, err, null, null); - } - - /** - * @param prev Previous closure. - * @param splits Splits to remove. - * @param rdc Reducers to remove. - */ - private CancelJobProcessor(@Nullable StackedProcessor prev, - Collection splits, - Collection rdc) { - this(prev, null, splits, rdc); - } - - /** - * @param prev Previous closure. - * @param err Error. - * @param splits Splits to remove. - * @param rdc Reducers to remove. - */ - private CancelJobProcessor(@Nullable StackedProcessor prev, - Throwable err, - Collection splits, - Collection rdc) { - super(prev); - - this.splits = splits; - this.rdc = rdc; - this.err = err; - } - - /** {@inheritDoc} */ - @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { - assert meta.phase() == PHASE_CANCELLING || err != null: "Invalid phase for cancel: " + meta; - - Collection rdcCp = new HashSet<>(cp.pendingReducers()); - - if (rdc != null) - rdcCp.removeAll(rdc); - - cp.pendingReducers(rdcCp); - - Map splitsCp = new HashMap<>(cp.pendingSplits()); - - if (splits != null) { - for (GridHadoopInputSplit s : splits) - splitsCp.remove(s); - } - - cp.pendingSplits(splitsCp); - - cp.phase(PHASE_CANCELLING); - - if (err != null) - cp.failCause(err); - } - } - - /** - * Increment counter values closure. - */ - private static class IncrementCountersProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final GridHadoopCounters counters; - - /** - * @param prev Previous closure. - * @param counters Task counters to add into job counters. - */ - private IncrementCountersProcessor(@Nullable StackedProcessor prev, GridHadoopCounters counters) { - super(prev); - - assert counters != null; - - this.counters = counters; - } - - /** {@inheritDoc} */ - @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { - GridHadoopCounters cntrs = new GridHadoopCountersImpl(cp.counters()); - - cntrs.merge(counters); - - cp.counters(cntrs); - } - } - - /** - * Abstract stacked closure. - */ - private abstract static class StackedProcessor implements - EntryProcessor, Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final StackedProcessor prev; - - /** - * @param prev Previous closure. - */ - private StackedProcessor(@Nullable StackedProcessor prev) { - this.prev = prev; - } - - /** {@inheritDoc} */ - @Override public Void process(MutableEntry e, Object... args) { - GridHadoopJobMetadata val = apply(e.getValue()); - - if (val != null) - e.setValue(val); - else - e.remove();; - - return null; - } - - /** - * @param meta Old value. - * @return New value. - */ - private GridHadoopJobMetadata apply(GridHadoopJobMetadata meta) { - if (meta == null) - return null; - - GridHadoopJobMetadata cp = prev != null ? prev.apply(meta) : new GridHadoopJobMetadata(meta); - - update(meta, cp); - - return cp; - } - - /** - * Update given job metadata object. - * - * @param meta Initial job metadata. - * @param cp Copy. - */ - protected abstract void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java new file mode 100644 index 0000000..3f574e9 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.jobtracker; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.*; + +/** + * Hadoop job metadata. Internal object used for distributed job state tracking. + */ +public class HadoopJobMetadata implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + private HadoopJobId jobId; + + /** Job info. */ + private HadoopJobInfo jobInfo; + + /** Node submitted job. */ + private UUID submitNodeId; + + /** Map-reduce plan. */ + private HadoopMapReducePlan mrPlan; + + /** Pending splits for which mapper should be executed. */ + private Map pendingSplits; + + /** Pending reducers. */ + private Collection pendingReducers; + + /** Reducers addresses. */ + @GridToStringInclude + private Map reducersAddrs; + + /** Job phase. */ + private HadoopJobPhase phase = PHASE_SETUP; + + /** Fail cause. */ + @GridToStringExclude + private Throwable failCause; + + /** Version. */ + private long ver; + + /** Job counters */ + private HadoopCounters counters = new HadoopCountersImpl(); + + /** + * Empty constructor required by {@link Externalizable}. + */ + public HadoopJobMetadata() { + // No-op. + } + + /** + * Constructor. + * + * @param submitNodeId Submit node ID. + * @param jobId Job ID. + * @param jobInfo Job info. + */ + public HadoopJobMetadata(UUID submitNodeId, HadoopJobId jobId, HadoopJobInfo jobInfo) { + this.jobId = jobId; + this.jobInfo = jobInfo; + this.submitNodeId = submitNodeId; + } + + /** + * Copy constructor. + * + * @param src Metadata to copy. + */ + public HadoopJobMetadata(HadoopJobMetadata src) { + // Make sure to preserve alphabetic order. + counters = src.counters; + failCause = src.failCause; + jobId = src.jobId; + jobInfo = src.jobInfo; + mrPlan = src.mrPlan; + pendingSplits = src.pendingSplits; + pendingReducers = src.pendingReducers; + phase = src.phase; + reducersAddrs = src.reducersAddrs; + submitNodeId = src.submitNodeId; + ver = src.ver + 1; + } + + /** + * @return Submit node ID. + */ + public UUID submitNodeId() { + return submitNodeId; + } + + /** + * @param phase Job phase. + */ + public void phase(HadoopJobPhase phase) { + this.phase = phase; + } + + /** + * @return Job phase. + */ + public HadoopJobPhase phase() { + return phase; + } + + /** + * Gets reducers addresses for external execution. + * + * @return Reducers addresses. + */ + public Map reducersAddresses() { + return reducersAddrs; + } + + /** + * Sets reducers addresses for external execution. + * + * @param reducersAddrs Map of addresses. + */ + public void reducersAddresses(Map reducersAddrs) { + this.reducersAddrs = reducersAddrs; + } + + /** + * Sets collection of pending splits. + * + * @param pendingSplits Collection of pending splits. + */ + public void pendingSplits(Map pendingSplits) { + this.pendingSplits = pendingSplits; + } + + /** + * Gets collection of pending splits. + * + * @return Collection of pending splits. + */ + public Map pendingSplits() { + return pendingSplits; + } + + /** + * Sets collection of pending reducers. + * + * @param pendingReducers Collection of pending reducers. + */ + public void pendingReducers(Collection pendingReducers) { + this.pendingReducers = pendingReducers; + } + + /** + * Gets collection of pending reducers. + * + * @return Collection of pending reducers. + */ + public Collection pendingReducers() { + return pendingReducers; + } + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** + * @param mrPlan Map-reduce plan. + */ + public void mapReducePlan(HadoopMapReducePlan mrPlan) { + assert this.mrPlan == null : "Map-reduce plan can only be initialized once."; + + this.mrPlan = mrPlan; + } + + /** + * @return Map-reduce plan. + */ + public HadoopMapReducePlan mapReducePlan() { + return mrPlan; + } + + /** + * @return Job info. + */ + public HadoopJobInfo jobInfo() { + return jobInfo; + } + + /** + * Returns job counters. + * + * @return Collection of counters. + */ + public HadoopCounters counters() { + return counters; + } + + /** + * Sets counters. + * + * @param counters Collection of counters. + */ + public void counters(HadoopCounters counters) { + this.counters = counters; + } + + /** + * @param failCause Fail cause. + */ + public void failCause(Throwable failCause) { + assert failCause != null; + + if (this.failCause == null) // Keep the first error. + this.failCause = failCause; + } + + /** + * @return Fail cause. + */ + public Throwable failCause() { + return failCause; + } + + /** + * @return Version. + */ + public long version() { + return ver; + } + + /** + * @param split Split. + * @return Task number. + */ + public int taskNumber(HadoopInputSplit split) { + return pendingSplits.get(split); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeUuid(out, submitNodeId); + out.writeObject(jobId); + out.writeObject(jobInfo); + out.writeObject(mrPlan); + out.writeObject(pendingSplits); + out.writeObject(pendingReducers); + out.writeObject(phase); + out.writeObject(failCause); + out.writeLong(ver); + out.writeObject(reducersAddrs); + out.writeObject(counters); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + submitNodeId = U.readUuid(in); + jobId = (HadoopJobId)in.readObject(); + jobInfo = (HadoopJobInfo)in.readObject(); + mrPlan = (HadoopMapReducePlan)in.readObject(); + pendingSplits = (Map)in.readObject(); + pendingReducers = (Collection)in.readObject(); + phase = (HadoopJobPhase)in.readObject(); + failCause = (Throwable)in.readObject(); + ver = in.readLong(); + reducersAddrs = (Map)in.readObject(); + counters = (HadoopCounters)in.readObject(); + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(HadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(), + "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null : + failCause.getClass().getName()); + } +}