Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B6F6C200BAA for ; Wed, 21 Sep 2016 16:53:05 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B55C7160ADB; Wed, 21 Sep 2016 14:53:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A64E4160AEB for ; Wed, 21 Sep 2016 16:52:58 +0200 (CEST) Received: (qmail 9242 invoked by uid 500); 21 Sep 2016 14:52:57 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 8219 invoked by uid 99); 21 Sep 2016 14:52:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Sep 2016 14:52:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 88114E09C6; Wed, 21 Sep 2016 14:52:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Wed, 21 Sep 2016 14:54:06 -0000 Message-Id: In-Reply-To: <07c11c42090e46f6bdd8ca62f593b415@git.apache.org> References: <07c11c42090e46f6bdd8ca62f593b415@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [74/92] [abbrv] ignite git commit: WIP. archived-at: Wed, 21 Sep 2016 14:53:05 -0000 WIP. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/facedf50 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/facedf50 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/facedf50 Branch: refs/heads/ignite-3949 Commit: facedf50d5617f524f80c44ead44c577febd8607 Parents: 39095f6 Author: vozerov-gridgain Authored: Wed Sep 21 17:06:10 2016 +0300 Committer: vozerov-gridgain Committed: Wed Sep 21 17:06:11 2016 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopComponent.java | 62 + .../processors/hadoop/impl/HadoopComponent.java | 62 - .../processors/hadoop/impl/HadoopContext.java | 202 --- .../processors/hadoop/impl/HadoopProcessor.java | 8 +- .../impl/jobtracker/HadoopJobMetadata.java | 316 ---- .../impl/jobtracker/HadoopJobTracker.java | 1700 ------------------ .../hadoop/impl/shuffle/HadoopShuffle.java | 263 --- .../HadoopEmbeddedTaskExecutor.java | 153 -- .../taskexecutor/HadoopExecutorService.java | 234 --- .../impl/taskexecutor/HadoopRunnableTask.java | 293 --- .../taskexecutor/HadoopTaskExecutorAdapter.java | 59 - .../impl/taskexecutor/HadoopTaskState.java | 38 - .../impl/taskexecutor/HadoopTaskStatus.java | 116 -- .../external/HadoopExternalTaskExecutor.java | 976 ---------- .../external/HadoopExternalTaskMetadata.java | 67 - .../external/HadoopJobInfoUpdateRequest.java | 113 -- .../external/HadoopPrepareForJobRequest.java | 130 -- .../external/HadoopProcessDescriptor.java | 149 -- .../external/HadoopProcessStartedAck.java | 47 - .../external/HadoopTaskExecutionRequest.java | 114 -- .../external/HadoopTaskFinishedMessage.java | 94 - .../child/HadoopChildProcessRunner.java | 460 ----- .../child/HadoopExternalProcessStarter.java | 301 ---- .../HadoopAbstractCommunicationClient.java | 96 - .../HadoopCommunicationClient.java | 72 - .../HadoopExternalCommunication.java | 1460 --------------- .../HadoopHandshakeTimeoutException.java | 42 - .../communication/HadoopIpcToNioAdapter.java | 248 --- .../communication/HadoopMarshallerFilter.java | 86 - .../communication/HadoopMessageListener.java | 39 - .../HadoopTcpNioCommunicationClient.java | 93 - .../hadoop/jobtracker/HadoopJobMetadata.java | 316 ++++ .../hadoop/jobtracker/HadoopJobTracker.java | 1699 +++++++++++++++++ .../HadoopEmbeddedTaskExecutor.java | 153 ++ .../taskexecutor/HadoopExecutorService.java | 234 +++ .../hadoop/taskexecutor/HadoopRunnableTask.java | 293 +++ .../taskexecutor/HadoopTaskExecutorAdapter.java | 59 + .../hadoop/taskexecutor/HadoopTaskStatus.java | 116 ++ .../external/HadoopExternalTaskExecutor.java | 976 ++++++++++ .../external/HadoopExternalTaskMetadata.java | 67 + .../external/HadoopJobInfoUpdateRequest.java | 113 ++ .../external/HadoopPrepareForJobRequest.java | 130 ++ .../external/HadoopProcessDescriptor.java | 149 ++ .../external/HadoopProcessStartedAck.java | 47 + .../external/HadoopTaskExecutionRequest.java | 114 ++ .../external/HadoopTaskFinishedMessage.java | 94 + .../child/HadoopChildProcessRunner.java | 471 +++++ .../child/HadoopExternalProcessStarter.java | 301 ++++ .../HadoopAbstractCommunicationClient.java | 96 + .../HadoopCommunicationClient.java | 72 + .../HadoopExternalCommunication.java | 1460 +++++++++++++++ .../HadoopHandshakeTimeoutException.java | 42 + .../communication/HadoopIpcToNioAdapter.java | 248 +++ .../communication/HadoopMarshallerFilter.java | 86 + .../communication/HadoopMessageListener.java | 39 + .../HadoopTcpNioCommunicationClient.java | 93 + .../hadoop/impl/HadoopCommandLineTest.java | 2 +- .../taskexecutor/HadoopExecutorServiceTest.java | 2 +- .../HadoopExternalTaskExecutionSelfTest.java | 232 --- .../HadoopExternalCommunicationSelfTest.java | 220 --- .../HadoopExternalTaskExecutionSelfTest.java | 232 +++ .../HadoopExternalCommunicationSelfTest.java | 220 +++ .../hadoop/taskexecutor/HadoopTaskState.java | 38 + 63 files changed, 8027 insertions(+), 8480 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java new file mode 100644 index 0000000..aeda5c0 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; + +/** + * Abstract class for all hadoop components. + */ +public abstract class HadoopComponent { + /** Hadoop context. */ + protected HadoopContext ctx; + + /** Logger. */ + protected IgniteLogger log; + + /** + * @param ctx Hadoop context. + */ + public void start(HadoopContext ctx) throws IgniteCheckedException { + this.ctx = ctx; + + log = ctx.kernalContext().log(getClass()); + } + + /** + * Stops manager. + */ + public void stop(boolean cancel) { + // No-op. + } + + /** + * Callback invoked when all grid components are started. + */ + public void onKernalStart() throws IgniteCheckedException { + // No-op. + } + + /** + * Callback invoked before all grid components are stopped. + */ + public void onKernalStop(boolean cancel) { + // No-op. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopComponent.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopComponent.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopComponent.java deleted file mode 100644 index 453d23f..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopComponent.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; - -/** - * Abstract class for all hadoop components. - */ -public abstract class HadoopComponent { - /** Hadoop context. */ - protected HadoopContext ctx; - - /** Logger. */ - protected IgniteLogger log; - - /** - * @param ctx Hadoop context. - */ - public void start(HadoopContext ctx) throws IgniteCheckedException { - this.ctx = ctx; - - log = ctx.kernalContext().log(getClass()); - } - - /** - * Stops manager. - */ - public void stop(boolean cancel) { - // No-op. - } - - /** - * Callback invoked when all grid components are started. - */ - public void onKernalStart() throws IgniteCheckedException { - // No-op. - } - - /** - * Callback invoked before all grid components are stopped. - */ - public void onKernalStop(boolean cancel) { - // No-op. - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopContext.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopContext.java deleted file mode 100644 index 9b85060..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopContext.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.UUID; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.HadoopConfiguration; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner; -import org.apache.ignite.internal.processors.hadoop.impl.jobtracker.HadoopJobMetadata; -import org.apache.ignite.internal.processors.hadoop.impl.jobtracker.HadoopJobTracker; -import org.apache.ignite.internal.processors.hadoop.impl.shuffle.HadoopShuffle; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskExecutorAdapter; -import org.apache.ignite.internal.util.typedef.internal.CU; - -/** - * Hadoop accelerator context. - */ -public class HadoopContext { - /** Kernal context. */ - private GridKernalContext ctx; - - /** Hadoop configuration. */ - private HadoopConfiguration cfg; - - /** Job tracker. */ - private HadoopJobTracker jobTracker; - - /** External task executor. */ - private HadoopTaskExecutorAdapter taskExecutor; - - /** */ - private HadoopShuffle shuffle; - - /** Managers list. */ - private List components = new ArrayList<>(); - - /** - * @param ctx Kernal context. - */ - public HadoopContext( - GridKernalContext ctx, - HadoopConfiguration cfg, - HadoopJobTracker jobTracker, - HadoopTaskExecutorAdapter taskExecutor, - HadoopShuffle shuffle - ) { - this.ctx = ctx; - this.cfg = cfg; - - this.jobTracker = add(jobTracker); - this.taskExecutor = add(taskExecutor); - this.shuffle = add(shuffle); - } - - /** - * Gets list of managers. - * - * @return List of managers. - */ - public List components() { - return components; - } - - /** - * Gets kernal context. - * - * @return Grid kernal context instance. - */ - public GridKernalContext kernalContext() { - return ctx; - } - - /** - * Gets Hadoop configuration. - * - * @return Hadoop configuration. - */ - public HadoopConfiguration configuration() { - return cfg; - } - - /** - * Gets local node ID. Shortcut for {@code kernalContext().localNodeId()}. - * - * @return Local node ID. - */ - public UUID localNodeId() { - return ctx.localNodeId(); - } - - /** - * Gets local node order. - * - * @return Local node order. - */ - public long localNodeOrder() { - assert ctx.discovery() != null; - - return ctx.discovery().localNode().order(); - } - - /** - * @return Hadoop-enabled nodes. - */ - public Collection nodes() { - return ctx.discovery().cacheNodes(CU.SYS_CACHE_HADOOP_MR, ctx.discovery().topologyVersionEx()); - } - - /** - * @return {@code True} if - */ - public boolean jobUpdateLeader() { - long minOrder = Long.MAX_VALUE; - ClusterNode minOrderNode = null; - - for (ClusterNode node : nodes()) { - if (node.order() < minOrder) { - minOrder = node.order(); - minOrderNode = node; - } - } - - assert minOrderNode != null; - - return localNodeId().equals(minOrderNode.id()); - } - - /** - * @param meta Job metadata. - * @return {@code true} If local node is participating in job execution. - */ - public boolean isParticipating(HadoopJobMetadata meta) { - UUID locNodeId = localNodeId(); - - if (locNodeId.equals(meta.submitNodeId())) - return true; - - HadoopMapReducePlan plan = meta.mapReducePlan(); - - return plan.mapperNodeIds().contains(locNodeId) || plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader(); - } - - /** - * @return Jon tracker instance. - */ - public HadoopJobTracker jobTracker() { - return jobTracker; - } - - /** - * @return Task executor. - */ - public HadoopTaskExecutorAdapter taskExecutor() { - return taskExecutor; - } - - /** - * @return Shuffle. - */ - public HadoopShuffle shuffle() { - return shuffle; - } - - /** - * @return Map-reduce planner. - */ - public HadoopMapReducePlanner planner() { - return cfg.getMapReducePlanner(); - } - - /** - * Adds component. - * - * @param c Component to add. - * @return Added manager. - */ - private C add(C c) { - components.add(c); - - return c; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java index c3cf8bb..a77e918 100644 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java @@ -27,15 +27,17 @@ import org.apache.ignite.internal.processors.hadoop.HadoopAttributes; import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; import org.apache.ignite.internal.processors.hadoop.HadoopClasspathUtils; import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; +import org.apache.ignite.internal.processors.hadoop.HadoopComponent; +import org.apache.ignite.internal.processors.hadoop.HadoopContext; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus; import org.apache.ignite.internal.processors.hadoop.HadoopLocations; import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.impl.jobtracker.HadoopJobTracker; -import org.apache.ignite.internal.processors.hadoop.impl.shuffle.HadoopShuffle; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopEmbeddedTaskExecutor; +import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; +import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopEmbeddedTaskExecutor; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/jobtracker/HadoopJobMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/jobtracker/HadoopJobMetadata.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/jobtracker/HadoopJobMetadata.java deleted file mode 100644 index 9e86770..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/jobtracker/HadoopJobMetadata.java +++ /dev/null @@ -1,316 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.jobtracker; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Collection; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopProcessDescriptor; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_SETUP; - -/** - * Hadoop job metadata. Internal object used for distributed job state tracking. - */ -public class HadoopJobMetadata implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - private HadoopJobId jobId; - - /** Job info. */ - private HadoopJobInfo jobInfo; - - /** Node submitted job. */ - private UUID submitNodeId; - - /** Map-reduce plan. */ - private HadoopMapReducePlan mrPlan; - - /** Pending splits for which mapper should be executed. */ - private Map pendingSplits; - - /** Pending reducers. */ - private Collection pendingReducers; - - /** Reducers addresses. */ - @GridToStringInclude - private Map reducersAddrs; - - /** Job phase. */ - private HadoopJobPhase phase = PHASE_SETUP; - - /** Fail cause. */ - @GridToStringExclude - private Throwable failCause; - - /** Version. */ - private long ver; - - /** Job counters */ - private HadoopCounters counters = new HadoopCountersImpl(); - - /** - * Empty constructor required by {@link Externalizable}. - */ - public HadoopJobMetadata() { - // No-op. - } - - /** - * Constructor. - * - * @param submitNodeId Submit node ID. - * @param jobId Job ID. - * @param jobInfo Job info. - */ - public HadoopJobMetadata(UUID submitNodeId, HadoopJobId jobId, HadoopJobInfo jobInfo) { - this.jobId = jobId; - this.jobInfo = jobInfo; - this.submitNodeId = submitNodeId; - } - - /** - * Copy constructor. - * - * @param src Metadata to copy. - */ - public HadoopJobMetadata(HadoopJobMetadata src) { - // Make sure to preserve alphabetic order. - counters = src.counters; - failCause = src.failCause; - jobId = src.jobId; - jobInfo = src.jobInfo; - mrPlan = src.mrPlan; - pendingSplits = src.pendingSplits; - pendingReducers = src.pendingReducers; - phase = src.phase; - reducersAddrs = src.reducersAddrs; - submitNodeId = src.submitNodeId; - ver = src.ver + 1; - } - - /** - * @return Submit node ID. - */ - public UUID submitNodeId() { - return submitNodeId; - } - - /** - * @param phase Job phase. - */ - public void phase(HadoopJobPhase phase) { - this.phase = phase; - } - - /** - * @return Job phase. - */ - public HadoopJobPhase phase() { - return phase; - } - - /** - * Gets reducers addresses for external execution. - * - * @return Reducers addresses. - */ - public Map reducersAddresses() { - return reducersAddrs; - } - - /** - * Sets reducers addresses for external execution. - * - * @param reducersAddrs Map of addresses. - */ - public void reducersAddresses(Map reducersAddrs) { - this.reducersAddrs = reducersAddrs; - } - - /** - * Sets collection of pending splits. - * - * @param pendingSplits Collection of pending splits. - */ - public void pendingSplits(Map pendingSplits) { - this.pendingSplits = pendingSplits; - } - - /** - * Gets collection of pending splits. - * - * @return Collection of pending splits. - */ - public Map pendingSplits() { - return pendingSplits; - } - - /** - * Sets collection of pending reducers. - * - * @param pendingReducers Collection of pending reducers. - */ - public void pendingReducers(Collection pendingReducers) { - this.pendingReducers = pendingReducers; - } - - /** - * Gets collection of pending reducers. - * - * @return Collection of pending reducers. - */ - public Collection pendingReducers() { - return pendingReducers; - } - - /** - * @return Job ID. - */ - public HadoopJobId jobId() { - return jobId; - } - - /** - * @param mrPlan Map-reduce plan. - */ - public void mapReducePlan(HadoopMapReducePlan mrPlan) { - assert this.mrPlan == null : "Map-reduce plan can only be initialized once."; - - this.mrPlan = mrPlan; - } - - /** - * @return Map-reduce plan. - */ - public HadoopMapReducePlan mapReducePlan() { - return mrPlan; - } - - /** - * @return Job info. - */ - public HadoopJobInfo jobInfo() { - return jobInfo; - } - - /** - * Returns job counters. - * - * @return Collection of counters. - */ - public HadoopCounters counters() { - return counters; - } - - /** - * Sets counters. - * - * @param counters Collection of counters. - */ - public void counters(HadoopCounters counters) { - this.counters = counters; - } - - /** - * @param failCause Fail cause. - */ - public void failCause(Throwable failCause) { - assert failCause != null; - - if (this.failCause == null) // Keep the first error. - this.failCause = failCause; - } - - /** - * @return Fail cause. - */ - public Throwable failCause() { - return failCause; - } - - /** - * @return Version. - */ - public long version() { - return ver; - } - - /** - * @param split Split. - * @return Task number. - */ - public int taskNumber(HadoopInputSplit split) { - return pendingSplits.get(split); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, submitNodeId); - out.writeObject(jobId); - out.writeObject(jobInfo); - out.writeObject(mrPlan); - out.writeObject(pendingSplits); - out.writeObject(pendingReducers); - out.writeObject(phase); - out.writeObject(failCause); - out.writeLong(ver); - out.writeObject(reducersAddrs); - out.writeObject(counters); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - submitNodeId = U.readUuid(in); - jobId = (HadoopJobId)in.readObject(); - jobInfo = (HadoopJobInfo)in.readObject(); - mrPlan = (HadoopMapReducePlan)in.readObject(); - pendingSplits = (Map)in.readObject(); - pendingReducers = (Collection)in.readObject(); - phase = (HadoopJobPhase)in.readObject(); - failCause = (Throwable)in.readObject(); - ver = in.readLong(); - reducersAddrs = (Map)in.readObject(); - counters = (HadoopCounters)in.readObject(); - } - - /** {@inheritDoc} */ - public String toString() { - return S.toString(HadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(), - "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null : - failCause.getClass().getName()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/jobtracker/HadoopJobTracker.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/jobtracker/HadoopJobTracker.java deleted file mode 100644 index 9033ee2..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/jobtracker/HadoopJobTracker.java +++ /dev/null @@ -1,1700 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.impl.jobtracker; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; -import javax.cache.event.CacheEntryEvent; -import javax.cache.event.CacheEntryUpdatedListener; -import javax.cache.expiry.Duration; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.expiry.ModifiedExpiryPolicy; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.MutableEntry; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; -import org.apache.ignite.internal.processors.cache.GridCacheAdapter; -import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; -import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; -import org.apache.ignite.internal.processors.hadoop.impl.HadoopComponent; -import org.apache.ignite.internal.processors.hadoop.impl.HadoopContext; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; -import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner; -import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskCancelledException; -import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskStatus; -import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopProcessDescriptor; -import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Job; -import org.apache.ignite.internal.util.GridMutex; -import org.apache.ignite.internal.util.GridSpinReadWriteLock; -import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.CIX1; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.SB; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; -import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_CANCELLING; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_COMPLETE; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_MAP; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_REDUCE; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_SETUP; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.ABORT; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.COMMIT; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.REDUCE; -import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.SETUP; -import static org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskState.COMPLETED; -import static org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskState.CRASHED; -import static org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskState.FAILED; -import static org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskState.RUNNING; - -/** - * Hadoop job tracker. - */ -public class HadoopJobTracker extends HadoopComponent { - /** */ - private final GridMutex mux = new GridMutex(); - - /** */ - private volatile IgniteInternalCache jobMetaPrj; - - /** Projection with expiry policy for finished job updates. */ - private volatile IgniteInternalCache finishedJobMetaPrj; - - /** Map-reduce execution planner. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private HadoopMapReducePlanner mrPlanner; - - /** All the known jobs. */ - private final ConcurrentMap> jobs = new ConcurrentHashMap8<>(); - - /** Locally active jobs. */ - private final ConcurrentMap activeJobs = new ConcurrentHashMap8<>(); - - /** Locally requested finish futures. */ - private final ConcurrentMap> activeFinishFuts = - new ConcurrentHashMap8<>(); - - /** Event processing service. */ - private ExecutorService evtProcSvc; - - /** Component busy lock. */ - private GridSpinReadWriteLock busyLock; - - /** Class to create HadoopJob instances from. */ - private Class jobCls; - - /** Closure to check result of async transform of system cache. */ - private final IgniteInClosure> failsLog = new CI1>() { - @Override public void apply(IgniteInternalFuture gridFut) { - try { - gridFut.get(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to transform system cache.", e); - } - } - }; - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void start(final HadoopContext ctx) throws IgniteCheckedException { - super.start(ctx); - - busyLock = new GridSpinReadWriteLock(); - - evtProcSvc = Executors.newFixedThreadPool(1); - - assert jobCls == null; - - HadoopClassLoader ldr = ctx.kernalContext().hadoopHelper().commonClassLoader(); - - try { - jobCls = (Class)ldr.loadClass(HadoopV2Job.class.getName()); - } - catch (Exception ioe) { - throw new IgniteCheckedException("Failed to load job class [class=" - + HadoopV2Job.class.getName() + ']', ioe); - } - } - - /** - * @return Job meta projection. - */ - @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") - private IgniteInternalCache jobMetaCache() { - IgniteInternalCache prj = jobMetaPrj; - - if (prj == null) { - synchronized (mux) { - if ((prj = jobMetaPrj) == null) { - GridCacheAdapter sysCache = ctx.kernalContext().cache() - .internalCache(CU.SYS_CACHE_HADOOP_MR); - - assert sysCache != null; - - mrPlanner = ctx.planner(); - - try { - ctx.kernalContext().resource().injectGeneric(mrPlanner); - } - catch (IgniteCheckedException e) { // Must not happen. - U.error(log, "Failed to inject resources.", e); - - throw new IllegalStateException(e); - } - - jobMetaPrj = prj = sysCache; - - if (ctx.configuration().getFinishedJobInfoTtl() > 0) { - ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy( - new Duration(MILLISECONDS, ctx.configuration().getFinishedJobInfoTtl())); - - finishedJobMetaPrj = prj.withExpiryPolicy(finishedJobPlc); - } - else - finishedJobMetaPrj = jobMetaPrj; - } - } - } - - return prj; - } - - /** - * @return Projection with expiry policy for finished job updates. - */ - private IgniteInternalCache finishedJobMetaCache() { - IgniteInternalCache prj = finishedJobMetaPrj; - - if (prj == null) { - jobMetaCache(); - - prj = finishedJobMetaPrj; - - assert prj != null; - } - - return prj; - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - jobMetaCache().context().continuousQueries().executeInternalQuery( - new CacheEntryUpdatedListener() { - @Override public void onUpdated(final Iterable> evts) { - if (!busyLock.tryReadLock()) - return; - - try { - // Must process query callback in a separate thread to avoid deadlocks. - evtProcSvc.submit(new EventHandler() { - @Override protected void body() throws IgniteCheckedException { - processJobMetadataUpdates(evts); - } - }); - } - finally { - busyLock.readUnlock(); - } - } - }, - null, - true, - true, - false - ); - - ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { - @Override public void onEvent(final Event evt) { - if (!busyLock.tryReadLock()) - return; - - try { - // Must process discovery callback in a separate thread to avoid deadlock. - evtProcSvc.submit(new EventHandler() { - @Override protected void body() { - processNodeLeft((DiscoveryEvent)evt); - } - }); - } - finally { - busyLock.readUnlock(); - } - } - }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); - - busyLock.writeLock(); - - evtProcSvc.shutdown(); - - // Fail all pending futures. - for (GridFutureAdapter fut : activeFinishFuts.values()) - fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job (grid is stopping).")); - } - - /** - * Submits execution of Hadoop job to grid. - * - * @param jobId Job ID. - * @param info Job info. - * @return Job completion future. - */ - @SuppressWarnings("unchecked") - public IgniteInternalFuture submit(HadoopJobId jobId, HadoopJobInfo info) { - if (!busyLock.tryReadLock()) { - return new GridFinishedFuture<>(new IgniteCheckedException("Failed to execute map-reduce job " + - "(grid is stopping): " + info)); - } - - try { - long jobPrepare = U.currentTimeMillis(); - - if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId)) - throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId); - - HadoopJob job = job(jobId, info); - - HadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null); - - HadoopJobMetadata meta = new HadoopJobMetadata(ctx.localNodeId(), jobId, info); - - meta.mapReducePlan(mrPlan); - - meta.pendingSplits(allSplits(mrPlan)); - meta.pendingReducers(allReducers(mrPlan)); - - GridFutureAdapter completeFut = new GridFutureAdapter<>(); - - GridFutureAdapter old = activeFinishFuts.put(jobId, completeFut); - - assert old == null : "Duplicate completion future [jobId=" + jobId + ", old=" + old + ']'; - - if (log.isDebugEnabled()) - log.debug("Submitting job metadata [jobId=" + jobId + ", meta=" + meta + ']'); - - long jobStart = U.currentTimeMillis(); - - HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(meta.counters(), - ctx.localNodeId()); - - perfCntr.clientSubmissionEvents(info); - perfCntr.onJobPrepare(jobPrepare); - perfCntr.onJobStart(jobStart); - - if (jobMetaCache().getAndPutIfAbsent(jobId, meta) != null) - throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId); - - return completeFut; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to submit job: " + jobId, e); - - return new GridFinishedFuture<>(e); - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Convert Hadoop job metadata to job status. - * - * @param meta Metadata. - * @return Status. - */ - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public static HadoopJobStatus status(HadoopJobMetadata meta) { - HadoopJobInfo jobInfo = meta.jobInfo(); - - return new HadoopJobStatus( - meta.jobId(), - jobInfo.jobName(), - jobInfo.user(), - meta.pendingSplits() != null ? meta.pendingSplits().size() : 0, - meta.pendingReducers() != null ? meta.pendingReducers().size() : 0, - meta.mapReducePlan().mappers(), - meta.mapReducePlan().reducers(), - meta.phase(), - meta.failCause() != null, - meta.version() - ); - } - - /** - * Gets hadoop job status for given job ID. - * - * @param jobId Job ID to get status for. - * @return Job status for given job ID or {@code null} if job was not found. - */ - @Nullable public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return null; // Grid is stopping. - - try { - HadoopJobMetadata meta = jobMetaCache().get(jobId); - - return meta != null ? status(meta) : null; - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Gets job finish future. - * - * @param jobId Job ID. - * @return Finish future or {@code null}. - * @throws IgniteCheckedException If failed. - */ - @Nullable public IgniteInternalFuture finishFuture(HadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return null; // Grid is stopping. - - try { - HadoopJobMetadata meta = jobMetaCache().get(jobId); - - if (meta == null) - return null; - - if (log.isTraceEnabled()) - log.trace("Got job metadata for status check [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']'); - - if (meta.phase() == PHASE_COMPLETE) { - if (log.isTraceEnabled()) - log.trace("Job is complete, returning finished future: " + jobId); - - return new GridFinishedFuture<>(jobId); - } - - GridFutureAdapter fut = F.addIfAbsent(activeFinishFuts, jobId, - new GridFutureAdapter()); - - // Get meta from cache one more time to close the window. - meta = jobMetaCache().get(jobId); - - if (log.isTraceEnabled()) - log.trace("Re-checking job metadata [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']'); - - if (meta == null) { - fut.onDone(); - - activeFinishFuts.remove(jobId , fut); - } - else if (meta.phase() == PHASE_COMPLETE) { - fut.onDone(jobId, meta.failCause()); - - activeFinishFuts.remove(jobId , fut); - } - - return fut; - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Gets job plan by job ID. - * - * @param jobId Job ID. - * @return Job plan. - * @throws IgniteCheckedException If failed. - */ - public HadoopMapReducePlan plan(HadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return null; - - try { - HadoopJobMetadata meta = jobMetaCache().get(jobId); - - if (meta != null) - return meta.mapReducePlan(); - - return null; - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Callback from task executor invoked when a task has been finished. - * - * @param info Task info. - * @param status Task status. - */ - @SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) - public void onTaskFinished(HadoopTaskInfo info, HadoopTaskStatus status) { - if (!busyLock.tryReadLock()) - return; - - try { - assert status.state() != RUNNING; - - if (log.isDebugEnabled()) - log.debug("Received task finished callback [info=" + info + ", status=" + status + ']'); - - JobLocalState state = activeJobs.get(info.jobId()); - - // Task CRASHes with null fail cause. - assert (status.state() != FAILED) || status.failCause() != null : - "Invalid task status [info=" + info + ", status=" + status + ']'; - - assert state != null || (ctx.jobUpdateLeader() && (info.type() == COMMIT || info.type() == ABORT)): - "Missing local state for finished task [info=" + info + ", status=" + status + ']'; - - StackedProcessor incrCntrs = null; - - if (status.state() == COMPLETED) - incrCntrs = new IncrementCountersProcessor(null, status.counters()); - - switch (info.type()) { - case SETUP: { - state.onSetupFinished(info, status, incrCntrs); - - break; - } - - case MAP: { - state.onMapFinished(info, status, incrCntrs); - - break; - } - - case REDUCE: { - state.onReduceFinished(info, status, incrCntrs); - - break; - } - - case COMBINE: { - state.onCombineFinished(info, status, incrCntrs); - - break; - } - - case COMMIT: - case ABORT: { - IgniteInternalCache cache = finishedJobMetaCache(); - - cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)). - listen(failsLog); - - break; - } - } - } - finally { - busyLock.readUnlock(); - } - } - - /** - * @param jobId Job id. - * @param c Closure of operation. - */ - private void transform(HadoopJobId jobId, EntryProcessor c) { - jobMetaCache().invokeAsync(jobId, c).listen(failsLog); - } - - /** - * Callback from task executor called when process is ready to received shuffle messages. - * - * @param jobId Job ID. - * @param reducers Reducers. - * @param desc Process descriptor. - */ - public void onExternalMappersInitialized(HadoopJobId jobId, Collection reducers, - HadoopProcessDescriptor desc) { - transform(jobId, new InitializeReducersProcessor(null, reducers, desc)); - } - - /** - * Gets all input splits for given hadoop map-reduce plan. - * - * @param plan Map-reduce plan. - * @return Collection of all input splits that should be processed. - */ - @SuppressWarnings("ConstantConditions") - private Map allSplits(HadoopMapReducePlan plan) { - Map res = new HashMap<>(); - - int taskNum = 0; - - for (UUID nodeId : plan.mapperNodeIds()) { - for (HadoopInputSplit split : plan.mappers(nodeId)) { - if (res.put(split, taskNum++) != null) - throw new IllegalStateException("Split duplicate."); - } - } - - return res; - } - - /** - * Gets all reducers for this job. - * - * @param plan Map-reduce plan. - * @return Collection of reducers. - */ - private Collection allReducers(HadoopMapReducePlan plan) { - Collection res = new HashSet<>(); - - for (int i = 0; i < plan.reducers(); i++) - res.add(i); - - return res; - } - - /** - * Processes node leave (or fail) event. - * - * @param evt Discovery event. - */ - @SuppressWarnings("ConstantConditions") - private void processNodeLeft(DiscoveryEvent evt) { - if (log.isDebugEnabled()) - log.debug("Processing discovery event [locNodeId=" + ctx.localNodeId() + ", evt=" + evt + ']'); - - // Check only if this node is responsible for job status updates. - if (ctx.jobUpdateLeader()) { - boolean checkSetup = evt.eventNode().order() < ctx.localNodeOrder(); - - // Iteration over all local entries is correct since system cache is REPLICATED. - for (Object metaObj : jobMetaCache().values()) { - HadoopJobMetadata meta = (HadoopJobMetadata)metaObj; - - HadoopJobId jobId = meta.jobId(); - - HadoopMapReducePlan plan = meta.mapReducePlan(); - - HadoopJobPhase phase = meta.phase(); - - try { - if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) { - // Failover setup task. - HadoopJob job = job(jobId, meta.jobInfo()); - - Collection setupTask = setupTask(jobId); - - assert setupTask != null; - - ctx.taskExecutor().run(job, setupTask); - } - else if (phase == PHASE_MAP || phase == PHASE_REDUCE) { - // Must check all nodes, even that are not event node ID due to - // multiple node failure possibility. - Collection cancelSplits = null; - - for (UUID nodeId : plan.mapperNodeIds()) { - if (ctx.kernalContext().discovery().node(nodeId) == null) { - // Node has left the grid. - Collection mappers = plan.mappers(nodeId); - - if (cancelSplits == null) - cancelSplits = new HashSet<>(); - - cancelSplits.addAll(mappers); - } - } - - Collection cancelReducers = null; - - for (UUID nodeId : plan.reducerNodeIds()) { - if (ctx.kernalContext().discovery().node(nodeId) == null) { - // Node has left the grid. - int[] reducers = plan.reducers(nodeId); - - if (cancelReducers == null) - cancelReducers = new HashSet<>(); - - for (int rdc : reducers) - cancelReducers.add(rdc); - } - } - - if (cancelSplits != null || cancelReducers != null) - jobMetaCache().invoke(meta.jobId(), new CancelJobProcessor(null, new IgniteCheckedException( - "One or more nodes participating in map-reduce job execution failed."), cancelSplits, - cancelReducers)); - } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to cancel job: " + meta, e); - } - } - } - } - - /** - * @param updated Updated cache entries. - * @throws IgniteCheckedException If failed. - */ - private void processJobMetadataUpdates( - Iterable> updated) - throws IgniteCheckedException { - UUID locNodeId = ctx.localNodeId(); - - for (CacheEntryEvent entry : updated) { - HadoopJobId jobId = entry.getKey(); - HadoopJobMetadata meta = entry.getValue(); - - if (meta == null || !ctx.isParticipating(meta)) - continue; - - if (log.isDebugEnabled()) - log.debug("Processing job metadata update callback [locNodeId=" + locNodeId + - ", meta=" + meta + ']'); - - try { - ctx.taskExecutor().onJobStateChanged(meta); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process job state changed callback (will fail the job) " + - "[locNodeId=" + locNodeId + ", jobId=" + jobId + ", meta=" + meta + ']', e); - - transform(jobId, new CancelJobProcessor(null, e)); - - continue; - } - - processJobMetaUpdate(jobId, meta, locNodeId); - } - } - - /** - * @param jobId Job ID. - * @param plan Map-reduce plan. - */ - @SuppressWarnings({"unused", "ConstantConditions" }) - private void printPlan(HadoopJobId jobId, HadoopMapReducePlan plan) { - log.info("Plan for " + jobId); - - SB b = new SB(); - - b.a(" Map: "); - - for (UUID nodeId : plan.mapperNodeIds()) - b.a(nodeId).a("=").a(plan.mappers(nodeId).size()).a(' '); - - log.info(b.toString()); - - b = new SB(); - - b.a(" Reduce: "); - - for (UUID nodeId : plan.reducerNodeIds()) - b.a(nodeId).a("=").a(Arrays.toString(plan.reducers(nodeId))).a(' '); - - log.info(b.toString()); - } - - /** - * @param jobId Job ID. - * @param meta Job metadata. - * @param locNodeId Local node ID. - * @throws IgniteCheckedException If failed. - */ - private void processJobMetaUpdate(HadoopJobId jobId, HadoopJobMetadata meta, UUID locNodeId) - throws IgniteCheckedException { - JobLocalState state = activeJobs.get(jobId); - - HadoopJob job = job(jobId, meta.jobInfo()); - - HadoopMapReducePlan plan = meta.mapReducePlan(); - - switch (meta.phase()) { - case PHASE_SETUP: { - if (ctx.jobUpdateLeader()) { - Collection setupTask = setupTask(jobId); - - if (setupTask != null) - ctx.taskExecutor().run(job, setupTask); - } - - break; - } - - case PHASE_MAP: { - // Check if we should initiate new task on local node. - Collection tasks = mapperTasks(plan.mappers(locNodeId), meta); - - if (tasks != null) - ctx.taskExecutor().run(job, tasks); - - break; - } - - case PHASE_REDUCE: { - if (meta.pendingReducers().isEmpty() && ctx.jobUpdateLeader()) { - HadoopTaskInfo info = new HadoopTaskInfo(COMMIT, jobId, 0, 0, null); - - if (log.isDebugEnabled()) - log.debug("Submitting COMMIT task for execution [locNodeId=" + locNodeId + - ", jobId=" + jobId + ']'); - - ctx.taskExecutor().run(job, Collections.singletonList(info)); - - break; - } - - Collection tasks = reducerTasks(plan.reducers(locNodeId), job); - - if (tasks != null) - ctx.taskExecutor().run(job, tasks); - - break; - } - - case PHASE_CANCELLING: { - // Prevent multiple task executor notification. - if (state != null && state.onCancel()) { - if (log.isDebugEnabled()) - log.debug("Cancelling local task execution for job: " + meta); - - ctx.taskExecutor().cancelTasks(jobId); - } - - if (meta.pendingSplits().isEmpty() && meta.pendingReducers().isEmpty()) { - if (ctx.jobUpdateLeader()) { - if (state == null) - state = initState(jobId); - - // Prevent running multiple abort tasks. - if (state.onAborted()) { - HadoopTaskInfo info = new HadoopTaskInfo(ABORT, jobId, 0, 0, null); - - if (log.isDebugEnabled()) - log.debug("Submitting ABORT task for execution [locNodeId=" + locNodeId + - ", jobId=" + jobId + ']'); - - ctx.taskExecutor().run(job, Collections.singletonList(info)); - } - } - - break; - } - else { - // Check if there are unscheduled mappers or reducers. - Collection cancelMappers = new ArrayList<>(); - Collection cancelReducers = new ArrayList<>(); - - Collection mappers = plan.mappers(ctx.localNodeId()); - - if (mappers != null) { - for (HadoopInputSplit b : mappers) { - if (state == null || !state.mapperScheduled(b)) - cancelMappers.add(b); - } - } - - int[] rdc = plan.reducers(ctx.localNodeId()); - - if (rdc != null) { - for (int r : rdc) { - if (state == null || !state.reducerScheduled(r)) - cancelReducers.add(r); - } - } - - if (!cancelMappers.isEmpty() || !cancelReducers.isEmpty()) - transform(jobId, new CancelJobProcessor(null, cancelMappers, cancelReducers)); - } - - break; - } - - case PHASE_COMPLETE: { - if (log.isDebugEnabled()) - log.debug("Job execution is complete, will remove local state from active jobs " + - "[jobId=" + jobId + ", meta=" + meta + ']'); - - if (state != null) { - state = activeJobs.remove(jobId); - - assert state != null; - - ctx.shuffle().jobFinished(jobId); - } - - GridFutureAdapter finishFut = activeFinishFuts.remove(jobId); - - if (finishFut != null) { - if (log.isDebugEnabled()) - log.debug("Completing job future [locNodeId=" + locNodeId + ", meta=" + meta + ']'); - - finishFut.onDone(jobId, meta.failCause()); - } - - assert job != null; - - if (ctx.jobUpdateLeader()) - job.cleanupStagingDirectory(); - - jobs.remove(jobId); - - if (ctx.jobUpdateLeader()) { - ClassLoader ldr = job.getClass().getClassLoader(); - - try { - String statWriterClsName = job.info().property(HadoopCommonUtils.JOB_COUNTER_WRITER_PROPERTY); - - if (statWriterClsName != null) { - Class cls = ldr.loadClass(statWriterClsName); - - HadoopCounterWriter writer = (HadoopCounterWriter)cls.newInstance(); - - HadoopCounters cntrs = meta.counters(); - - writer.write(job, cntrs); - } - } - catch (Exception e) { - log.error("Can't write statistic due to: ", e); - } - } - - job.dispose(false); - - break; - } - - default: - throw new IllegalStateException("Unknown phase: " + meta.phase()); - } - } - - /** - * Creates setup task based on job information. - * - * @param jobId Job ID. - * @return Setup task wrapped in collection. - */ - @Nullable private Collection setupTask(HadoopJobId jobId) { - if (activeJobs.containsKey(jobId)) - return null; - else { - initState(jobId); - - return Collections.singleton(new HadoopTaskInfo(SETUP, jobId, 0, 0, null)); - } - } - - /** - * Creates mapper tasks based on job information. - * - * @param mappers Mapper blocks. - * @param meta Job metadata. - * @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node. - */ - private Collection mapperTasks(Iterable mappers, HadoopJobMetadata meta) { - UUID locNodeId = ctx.localNodeId(); - HadoopJobId jobId = meta.jobId(); - - JobLocalState state = activeJobs.get(jobId); - - Collection tasks = null; - - if (mappers != null) { - if (state == null) - state = initState(jobId); - - for (HadoopInputSplit split : mappers) { - if (state.addMapper(split)) { - if (log.isDebugEnabled()) - log.debug("Submitting MAP task for execution [locNodeId=" + locNodeId + - ", split=" + split + ']'); - - HadoopTaskInfo taskInfo = new HadoopTaskInfo(MAP, jobId, meta.taskNumber(split), 0, split); - - if (tasks == null) - tasks = new ArrayList<>(); - - tasks.add(taskInfo); - } - } - } - - return tasks; - } - - /** - * Creates reducer tasks based on job information. - * - * @param reducers Reducers (may be {@code null}). - * @param job Job instance. - * @return Collection of task infos. - */ - private Collection reducerTasks(int[] reducers, HadoopJob job) { - UUID locNodeId = ctx.localNodeId(); - HadoopJobId jobId = job.id(); - - JobLocalState state = activeJobs.get(jobId); - - Collection tasks = null; - - if (reducers != null) { - if (state == null) - state = initState(job.id()); - - for (int rdc : reducers) { - if (state.addReducer(rdc)) { - if (log.isDebugEnabled()) - log.debug("Submitting REDUCE task for execution [locNodeId=" + locNodeId + - ", rdc=" + rdc + ']'); - - HadoopTaskInfo taskInfo = new HadoopTaskInfo(REDUCE, jobId, rdc, 0, null); - - if (tasks == null) - tasks = new ArrayList<>(); - - tasks.add(taskInfo); - } - } - } - - return tasks; - } - - /** - * Initializes local state for given job metadata. - * - * @param jobId Job ID. - * @return Local state. - */ - private JobLocalState initState(HadoopJobId jobId) { - return F.addIfAbsent(activeJobs, jobId, new JobLocalState()); - } - - /** - * Gets or creates job instance. - * - * @param jobId Job ID. - * @param jobInfo Job info. - * @return Job. - * @throws IgniteCheckedException If failed. - */ - @Nullable public HadoopJob job(HadoopJobId jobId, @Nullable HadoopJobInfo jobInfo) throws IgniteCheckedException { - GridFutureAdapter fut = jobs.get(jobId); - - if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapter())) != null) - return fut.get(); - - fut = jobs.get(jobId); - - HadoopJob job = null; - - try { - if (jobInfo == null) { - HadoopJobMetadata meta = jobMetaCache().get(jobId); - - if (meta == null) - throw new IgniteCheckedException("Failed to find job metadata for ID: " + jobId); - - jobInfo = meta.jobInfo(); - } - - job = jobInfo.createJob(jobCls, jobId, log, ctx.configuration().getNativeLibraryNames(), - ctx.kernalContext().hadoopHelper()); - - job.initialize(false, ctx.localNodeId()); - - fut.onDone(job); - - return job; - } - catch (IgniteCheckedException e) { - fut.onDone(e); - - jobs.remove(jobId, fut); - - if (job != null) { - try { - job.dispose(false); - } - catch (IgniteCheckedException e0) { - U.error(log, "Failed to dispose job: " + jobId, e0); - } - } - - throw e; - } - } - - /** - * Kills job. - * - * @param jobId Job ID. - * @return {@code True} if job was killed. - * @throws IgniteCheckedException If failed. - */ - public boolean killJob(HadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return false; // Grid is stopping. - - try { - HadoopJobMetadata meta = jobMetaCache().get(jobId); - - if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) { - HadoopTaskCancelledException err = new HadoopTaskCancelledException("Job cancelled."); - - jobMetaCache().invoke(jobId, new CancelJobProcessor(null, err)); - } - } - finally { - busyLock.readUnlock(); - } - - IgniteInternalFuture fut = finishFuture(jobId); - - if (fut != null) { - try { - fut.get(); - } - catch (Exception e) { - if (e.getCause() instanceof HadoopTaskCancelledException) - return true; - } - } - - return false; - } - - /** - * Returns job counters. - * - * @param jobId Job identifier. - * @return Job counters or {@code null} if job cannot be found. - * @throws IgniteCheckedException If failed. - */ - @Nullable public HadoopCounters jobCounters(HadoopJobId jobId) throws IgniteCheckedException { - if (!busyLock.tryReadLock()) - return null; - - try { - final HadoopJobMetadata meta = jobMetaCache().get(jobId); - - return meta != null ? meta.counters() : null; - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Event handler protected by busy lock. - */ - private abstract class EventHandler implements Runnable { - /** {@inheritDoc} */ - @Override public void run() { - if (!busyLock.tryReadLock()) - return; - - try { - body(); - } - catch (Throwable e) { - U.error(log, "Unhandled exception while processing event.", e); - - if (e instanceof Error) - throw (Error)e; - } - finally { - busyLock.readUnlock(); - } - } - - /** - * Handler body. - */ - protected abstract void body() throws Exception; - } - - /** - * - */ - private class JobLocalState { - /** Mappers. */ - private final Collection currMappers = new HashSet<>(); - - /** Reducers. */ - private final Collection currReducers = new HashSet<>(); - - /** Number of completed mappers. */ - private final AtomicInteger completedMappersCnt = new AtomicInteger(); - - /** Cancelled flag. */ - private boolean cancelled; - - /** Aborted flag. */ - private boolean aborted; - - /** - * @param mapSplit Map split to add. - * @return {@code True} if mapper was added. - */ - private boolean addMapper(HadoopInputSplit mapSplit) { - return currMappers.add(mapSplit); - } - - /** - * @param rdc Reducer number to add. - * @return {@code True} if reducer was added. - */ - private boolean addReducer(int rdc) { - return currReducers.add(rdc); - } - - /** - * Checks whether this split was scheduled for given attempt. - * - * @param mapSplit Map split to check. - * @return {@code True} if mapper was scheduled. - */ - public boolean mapperScheduled(HadoopInputSplit mapSplit) { - return currMappers.contains(mapSplit); - } - - /** - * Checks whether this split was scheduled for given attempt. - * - * @param rdc Reducer number to check. - * @return {@code True} if reducer was scheduled. - */ - public boolean reducerScheduled(int rdc) { - return currReducers.contains(rdc); - } - - /** - * @param taskInfo Task info. - * @param status Task status. - * @param prev Previous closure. - */ - private void onSetupFinished(final HadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) { - final HadoopJobId jobId = taskInfo.jobId(); - - if (status.state() == FAILED || status.state() == CRASHED) - transform(jobId, new CancelJobProcessor(prev, status.failCause())); - else - transform(jobId, new UpdatePhaseProcessor(prev, PHASE_MAP)); - } - - /** - * @param taskInfo Task info. - * @param status Task status. - * @param prev Previous closure. - */ - private void onMapFinished(final HadoopTaskInfo taskInfo, HadoopTaskStatus status, - final StackedProcessor prev) { - final HadoopJobId jobId = taskInfo.jobId(); - - boolean lastMapperFinished = completedMappersCnt.incrementAndGet() == currMappers.size(); - - if (status.state() == FAILED || status.state() == CRASHED) { - // Fail the whole job. - transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), status.failCause())); - - return; - } - - IgniteInClosure> cacheUpdater = new CIX1>() { - @Override public void applyx(IgniteInternalFuture f) { - Throwable err = null; - - if (f != null) { - try { - f.get(); - } - catch (IgniteCheckedException e) { - err = e; - } - } - - transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), err)); - } - }; - - if (lastMapperFinished) - ctx.shuffle().flush(jobId).listen(cacheUpdater); - else - cacheUpdater.apply(null); - } - - /** - * @param taskInfo Task info. - * @param status Task status. - * @param prev Previous closure. - */ - private void onReduceFinished(HadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) { - HadoopJobId jobId = taskInfo.jobId(); - if (status.state() == FAILED || status.state() == CRASHED) - // Fail the whole job. - transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber(), status.failCause())); - else - transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber())); - } - - /** - * @param taskInfo Task info. - * @param status Task status. - * @param prev Previous closure. - */ - private void onCombineFinished(HadoopTaskInfo taskInfo, HadoopTaskStatus status, - final StackedProcessor prev) { - final HadoopJobId jobId = taskInfo.jobId(); - - if (status.state() == FAILED || status.state() == CRASHED) - // Fail the whole job. - transform(jobId, new RemoveMappersProcessor(prev, currMappers, status.failCause())); - else { - ctx.shuffle().flush(jobId).listen(new CIX1>() { - @Override public void applyx(IgniteInternalFuture f) { - Throwable err = null; - - if (f != null) { - try { - f.get(); - } - catch (IgniteCheckedException e) { - err = e; - } - } - - transform(jobId, new RemoveMappersProcessor(prev, currMappers, err)); - } - }); - } - } - - /** - * @return {@code True} if job was cancelled by this (first) call. - */ - public boolean onCancel() { - if (!cancelled && !aborted) { - cancelled = true; - - return true; - } - - return false; - } - - /** - * @return {@code True} if job was aborted this (first) call. - */ - public boolean onAborted() { - if (!aborted) { - aborted = true; - - return true; - } - - return false; - } - } - - /** - * Update job phase transform closure. - */ - private static class UpdatePhaseProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Phase to update. */ - private final HadoopJobPhase phase; - - /** - * @param prev Previous closure. - * @param phase Phase to update. - */ - private UpdatePhaseProcessor(@Nullable StackedProcessor prev, HadoopJobPhase phase) { - super(prev); - - this.phase = phase; - } - - /** {@inheritDoc} */ - @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { - cp.phase(phase); - } - } - - /** - * Remove mapper transform closure. - */ - private static class RemoveMappersProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Mapper split to remove. */ - private final Collection splits; - - /** Error. */ - private final Throwable err; - - /** - * @param prev Previous closure. - * @param split Mapper split to remove. - * @param err Error. - */ - private RemoveMappersProcessor(@Nullable StackedProcessor prev, HadoopInputSplit split, Throwable err) { - this(prev, Collections.singletonList(split), err); - } - - /** - * @param prev Previous closure. - * @param splits Mapper splits to remove. - * @param err Error. - */ - private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection splits, - Throwable err) { - super(prev); - - this.splits = splits; - this.err = err; - } - - /** {@inheritDoc} */ - @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { - Map splitsCp = new HashMap<>(cp.pendingSplits()); - - for (HadoopInputSplit s : splits) - splitsCp.remove(s); - - cp.pendingSplits(splitsCp); - - if (cp.phase() != PHASE_CANCELLING && err != null) - cp.failCause(err); - - if (err != null) - cp.phase(PHASE_CANCELLING); - - if (splitsCp.isEmpty()) { - if (cp.phase() != PHASE_CANCELLING) - cp.phase(PHASE_REDUCE); - } - } - } - - /** - * Remove reducer transform closure. - */ - private static class RemoveReducerProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Mapper split to remove. */ - private final int rdc; - - /** Error. */ - private Throwable err; - - /** - * @param prev Previous closure. - * @param rdc Reducer to remove. - */ - private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc) { - super(prev); - - this.rdc = rdc; - } - - /** - * @param prev Previous closure. - * @param rdc Reducer to remove. - * @param err Error. - */ - private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc, Throwable err) { - super(prev); - - this.rdc = rdc; - this.err = err; - } - - /** {@inheritDoc} */ - @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { - Collection rdcCp = new HashSet<>(cp.pendingReducers()); - - rdcCp.remove(rdc); - - cp.pendingReducers(rdcCp); - - if (err != null) { - cp.phase(PHASE_CANCELLING); - cp.failCause(err); - } - } - } - - /** - * Initialize reducers. - */ - private static class InitializeReducersProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Reducers. */ - private final Collection rdc; - - /** Process descriptor for reducers. */ - private final HadoopProcessDescriptor desc; - - /** - * @param prev Previous closure. - * @param rdc Reducers to initialize. - * @param desc External process descriptor. - */ - private InitializeReducersProcessor(@Nullable StackedProcessor prev, - Collection rdc, - HadoopProcessDescriptor desc) { - super(prev); - - assert !F.isEmpty(rdc); - assert desc != null; - - this.rdc = rdc; - this.desc = desc; - } - - /** {@inheritDoc} */ - @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { - Map oldMap = meta.reducersAddresses(); - - Map rdcMap = oldMap == null ? - new HashMap() : new HashMap<>(oldMap); - - for (Integer r : rdc) - rdcMap.put(r, desc); - - cp.reducersAddresses(rdcMap); - } - } - - /** - * Remove reducer transform closure. - */ - private static class CancelJobProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** Mapper split to remove. */ - private final Collection splits; - - /** Reducers to remove. */ - private final Collection rdc; - - /** Error. */ - private final Throwable err; - - /** - * @param prev Previous closure. - * @param err Fail cause. - */ - private CancelJobProcessor(@Nullable StackedProcessor prev, Throwable err) { - this(prev, err, null, null); - } - - /** - * @param prev Previous closure. - * @param splits Splits to remove. - * @param rdc Reducers to remove. - */ - private CancelJobProcessor(@Nullable StackedProcessor prev, - Collection splits, - Collection rdc) { - this(prev, null, splits, rdc); - } - - /** - * @param prev Previous closure. - * @param err Error. - * @param splits Splits to remove. - * @param rdc Reducers to remove. - */ - private CancelJobProcessor(@Nullable StackedProcessor prev, - Throwable err, - Collection splits, - Collection rdc) { - super(prev); - - this.splits = splits; - this.rdc = rdc; - this.err = err; - } - - /** {@inheritDoc} */ - @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { - final HadoopJobPhase currPhase = meta.phase(); - - assert currPhase == PHASE_CANCELLING || currPhase == PHASE_COMPLETE - || err != null: "Invalid phase for cancel: " + currPhase; - - Collection rdcCp = new HashSet<>(cp.pendingReducers()); - - if (rdc != null) - rdcCp.removeAll(rdc); - - cp.pendingReducers(rdcCp); - - Map splitsCp = new HashMap<>(cp.pendingSplits()); - - if (splits != null) { - for (HadoopInputSplit s : splits) - splitsCp.remove(s); - } - - cp.pendingSplits(splitsCp); - - if (currPhase != PHASE_COMPLETE && currPhase != PHASE_CANCELLING) - cp.phase(PHASE_CANCELLING); - - if (err != null) - cp.failCause(err); - } - } - - /** - * Increment counter values closure. - */ - private static class IncrementCountersProcessor extends StackedProcessor { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final HadoopCounters counters; - - /** - * @param prev Previous closure. - * @param counters Task counters to add into job counters. - */ - private IncrementCountersProcessor(@Nullable StackedProcessor prev, HadoopCounters counters) { - super(prev); - - assert counters != null; - - this.counters = counters; - } - - /** {@inheritDoc} */ - @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { - HadoopCounters cntrs = new HadoopCountersImpl(cp.counters()); - - cntrs.merge(counters); - - cp.counters(cntrs); - } - } - - /** - * Abstract stacked closure. - */ - private abstract static class StackedProcessor implements - EntryProcessor, Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final StackedProcessor prev; - - /** - * @param prev Previous closure. - */ - private StackedProcessor(@Nullable StackedProcessor prev) { - this.prev = prev; - } - - /** {@inheritDoc} */ - @Override public Void process(MutableEntry e, Object... args) { - HadoopJobMetadata val = apply(e.getValue()); - - if (val != null) - e.setValue(val); - else - e.remove(); - - return null; - } - - /** - * @param meta Old value. - * @return New value. - */ - private HadoopJobMetadata apply(HadoopJobMetadata meta) { - if (meta == null) - return null; - - HadoopJobMetadata cp = prev != null ? prev.apply(meta) : new HadoopJobMetadata(meta); - - update(meta, cp); - - return cp; - } - - /** - * Update given job metadata object. - * - * @param meta Initial job metadata. - * @param cp Copy. - */ - protected abstract void update(HadoopJobMetadata meta, HadoopJobMetadata cp); - } -}