Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D6C14200CFC for ; Thu, 14 Sep 2017 01:33:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D564A1609CE; Wed, 13 Sep 2017 23:33:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 825C91609CA for ; Thu, 14 Sep 2017 01:33:18 +0200 (CEST) Received: (qmail 68180 invoked by uid 500); 13 Sep 2017 23:32:54 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 64078 invoked by uid 99); 13 Sep 2017 23:32:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Sep 2017 23:32:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 39F87F582E; Wed, 13 Sep 2017 23:32:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Wed, 13 Sep 2017 23:33:51 -0000 Message-Id: <1c89817f0c3b4bbb891de5ecf98118db@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [64/82] [abbrv] hadoop git commit: YARN-7091. Rename application to service in yarn-native-services. Contributed by Jian He archived-at: Wed, 13 Sep 2017 23:33:21 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java new file mode 100644 index 0000000..cb7131e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -0,0 +1,494 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.component; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId; +import org.apache.hadoop.yarn.service.ContainerFailureTracker; +import org.apache.hadoop.yarn.service.ServiceContext; +import org.apache.hadoop.yarn.service.ServiceScheduler; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; +import org.apache.hadoop.yarn.service.ServiceMetrics; +import org.apache.hadoop.yarn.state.InvalidStateTransitionException; +import org.apache.hadoop.yarn.state.MultipleArcTransition; +import org.apache.hadoop.yarn.state.SingleArcTransition; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.Apps; +import org.apache.hadoop.yarn.service.utils.SliderUtils; +import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils; +import org.apache.hadoop.yarn.service.monitor.probe.Probe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.*; +import static org.apache.hadoop.yarn.service.component.ComponentEventType.*; +import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START; +import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP; +import static org.apache.hadoop.yarn.service.component.ComponentState.*; +import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.CONTAINER_FAILURE_THRESHOLD; + +public class Component implements EventHandler { + private static final Logger LOG = LoggerFactory.getLogger(Component.class); + + private org.apache.hadoop.yarn.service.api.records.Component componentSpec; + private long allocateId; + private Priority priority; + private ServiceMetrics componentMetrics; + private ServiceScheduler scheduler; + private ServiceContext context; + private AMRMClientAsync amrmClient; + private AtomicLong instanceIdCounter = new AtomicLong(); + private Map compInstances = + new ConcurrentHashMap<>(); + // component instances to be assigned with a container + private List pendingInstances = new LinkedList<>(); + private ContainerFailureTracker failureTracker; + private Probe probe; + private final ReentrantReadWriteLock.ReadLock readLock; + private final ReentrantReadWriteLock.WriteLock writeLock; + public int maxContainerFailurePerComp; + // The number of containers failed since last reset. This excludes preempted, + // disk_failed containers etc. This will be reset to 0 periodically. + public AtomicInteger currentContainerFailure = new AtomicInteger(0); + + private StateMachine + stateMachine; + private AsyncDispatcher compInstanceDispatcher; + private static final StateMachineFactory + stateMachineFactory = + new StateMachineFactory( + INIT) + // INIT will only got to FLEXING + .addTransition(INIT, EnumSet.of(STABLE, FLEXING), + FLEX, new FlexComponentTransition()) + + // container allocated by RM + .addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED, + new ContainerAllocatedTransition()) + // container launched on NM + .addTransition(FLEXING, EnumSet.of(STABLE, FLEXING), + CONTAINER_STARTED, new ContainerStartedTransition()) + // container failed while flexing + .addTransition(FLEXING, FLEXING, CONTAINER_COMPLETED, + new ContainerCompletedTransition()) + // Flex while previous flex is still in progress + .addTransition(FLEXING, EnumSet.of(FLEXING), FLEX, + new FlexComponentTransition()) + + // container failed while stable + .addTransition(STABLE, FLEXING, CONTAINER_COMPLETED, + new ContainerCompletedTransition()) + // Ignore surplus container + .addTransition(STABLE, STABLE, CONTAINER_ALLOCATED, + new ContainerAllocatedTransition()) + // Flex by user + // For flex up, go to FLEXING state + // For flex down, go to STABLE state + .addTransition(STABLE, EnumSet.of(STABLE, FLEXING), + FLEX, new FlexComponentTransition()) + .installTopology(); + + public Component( + org.apache.hadoop.yarn.service.api.records.Component component, + long allocateId, ServiceContext context) { + this.allocateId = allocateId; + this.priority = Priority.newInstance((int) allocateId); + this.componentSpec = component; + componentMetrics = ServiceMetrics.register(component.getName(), + "Metrics for component " + component.getName()); + componentMetrics + .tag("type", "Metrics type [component or service]", "component"); + this.scheduler = context.scheduler; + this.context = context; + amrmClient = scheduler.getAmRMClient(); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.readLock = lock.readLock(); + this.writeLock = lock.writeLock(); + this.stateMachine = stateMachineFactory.make(this); + compInstanceDispatcher = scheduler.getCompInstanceDispatcher(); + failureTracker = + new ContainerFailureTracker(context, this); + probe = MonitorUtils.getProbe(componentSpec.getReadinessCheck()); + maxContainerFailurePerComp = componentSpec.getConfiguration() + .getPropertyInt(CONTAINER_FAILURE_THRESHOLD, 10); + createNumCompInstances(component.getNumberOfContainers()); + } + + private void createNumCompInstances(long count) { + for (int i = 0; i < count; i++) { + createOneCompInstance(); + } + } + + private void createOneCompInstance() { + ComponentInstanceId id = + new ComponentInstanceId(instanceIdCounter.getAndIncrement(), + componentSpec.getName()); + ComponentInstance instance = new ComponentInstance(this, id); + compInstances.put(id, instance); + pendingInstances.add(instance); + } + + private static class FlexComponentTransition implements + MultipleArcTransition { + // For flex up, go to FLEXING state + // For flex down, go to STABLE state + @Override + public ComponentState transition(Component component, + ComponentEvent event) { + component.setDesiredContainers((int)event.getDesired()); + if (!component.areDependenciesReady()) { + LOG.info("[FLEX COMPONENT {}]: Flex deferred because dependencies not" + + " satisfied.", component.getName()); + return component.getState(); + } + if (component.getState() == INIT) { + // This happens on init + LOG.info("[INIT COMPONENT " + component.getName() + "]: " + event + .getDesired() + " instances."); + component.requestContainers(event.getDesired()); + return FLEXING; + } + long before = component.getComponentSpec().getNumberOfContainers(); + long delta = event.getDesired() - before; + component.getComponentSpec().setNumberOfContainers(event.getDesired()); + if (delta > 0) { + // Scale up + LOG.info("[FLEX UP COMPONENT " + component.getName() + "]: scaling up from " + + before + " to " + event.getDesired()); + component.requestContainers(delta); + component.createNumCompInstances(delta); + return FLEXING; + } else if (delta < 0){ + delta = 0 - delta; + // scale down + LOG.info("[FLEX DOWN COMPONENT " + component.getName() + + "]: scaling down from " + before + " to " + event.getDesired()); + List list = + new ArrayList<>(component.compInstances.values()); + + // sort in Most recent -> oldest order, destroy most recent ones. + Collections.sort(list, Collections.reverseOrder()); + for (int i = 0; i < delta; i++) { + ComponentInstance instance = list.get(i); + // remove the instance + component.compInstances.remove(instance.getCompInstanceId()); + component.pendingInstances.remove(instance); + component.componentMetrics.containersFailed.incr(); + component.componentMetrics.containersRunning.decr(); + // decrement id counter + component.instanceIdCounter.decrementAndGet(); + instance.destroy(); + } + return STABLE; + } else { + LOG.info("[FLEX COMPONENT " + component.getName() + "]: already has " + + event.getDesired() + " instances, ignoring"); + return STABLE; + } + } + } + + private static class ContainerAllocatedTransition extends BaseTransition { + @Override + public void transition(Component component, ComponentEvent event) { + component.assignContainerToCompInstance(event.getContainer()); + } + } + + private static class ContainerStartedTransition implements + MultipleArcTransition { + + @Override public ComponentState transition(Component component, + ComponentEvent event) { + component.compInstanceDispatcher.getEventHandler().handle( + new ComponentInstanceEvent(event.getInstance().getContainerId(), + START)); + component.incRunningContainers(); + return checkIfStable(component); + } + } + + private static ComponentState checkIfStable(Component component) { + // if desired == running + if (component.componentMetrics.containersRunning.value() == component + .getComponentSpec().getNumberOfContainers()) { + return STABLE; + } else { + return FLEXING; + } + } + + private static class ContainerCompletedTransition extends BaseTransition { + @Override + public void transition(Component component, ComponentEvent event) { + component.updateMetrics(event.getStatus()); + + // add back to pending list + component.pendingInstances.add(event.getInstance()); + LOG.info( + "[COMPONENT {}]: {} completed, num pending comp instances increased to {}.", + component.getName(), event.getStatus().getContainerId(), + component.pendingInstances.size()); + component.compInstanceDispatcher.getEventHandler().handle( + new ComponentInstanceEvent(event.getStatus().getContainerId(), + STOP).setStatus(event.getStatus())); + } + } + + public ServiceMetrics getCompMetrics () { + return componentMetrics; + } + + private void assignContainerToCompInstance(Container container) { + if (pendingInstances.size() == 0) { + LOG.info( + "[COMPONENT {}]: No pending component instance left, release surplus container {}", + getName(), container.getId()); + scheduler.getAmRMClient().releaseAssignedContainer(container.getId()); + componentMetrics.surplusContainers.incr(); + scheduler.getServiceMetrics().surplusContainers.incr(); + return; + } + ComponentInstance instance = pendingInstances.remove(0); + LOG.info( + "[COMPONENT {}]: {} allocated, num pending component instances reduced to {}", + getName(), container.getId(), pendingInstances.size()); + instance.setContainer(container); + scheduler.addLiveCompInstance(container.getId(), instance); + LOG.info( + "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ", + getName(), container.getId(), instance.getCompInstanceName(), + container.getNodeId()); + scheduler.getContainerLaunchService() + .launchCompInstance(scheduler.getApp(), instance, container); + } + + @SuppressWarnings({ "unchecked" }) + public void requestContainers(long count) { + Resource resource = Resource + .newInstance(componentSpec.getResource().getMemoryMB(), + componentSpec.getResource().getCpus()); + + for (int i = 0; i < count; i++) { + //TODO Once YARN-5468 is done, use that for anti-affinity + ContainerRequest request = + ContainerRequest.newBuilder().capability(resource).priority(priority) + .allocationRequestId(allocateId).relaxLocality(true).build(); + amrmClient.addContainerRequest(request); + } + } + + private void setDesiredContainers(int n) { + int delta = n - scheduler.getServiceMetrics().containersDesired.value(); + if (delta > 0) { + scheduler.getServiceMetrics().containersDesired.incr(delta); + } else { + scheduler.getServiceMetrics().containersDesired.decr(delta); + } + componentMetrics.containersDesired.set(n); + } + + + + private void updateMetrics(ContainerStatus status) { + switch (status.getExitStatus()) { + case SUCCESS: + componentMetrics.containersSucceeded.incr(); + scheduler.getServiceMetrics().containersSucceeded.incr(); + return; + case PREEMPTED: + componentMetrics.containersPreempted.incr(); + scheduler.getServiceMetrics().containersPreempted.incr(); + break; + case DISKS_FAILED: + componentMetrics.containersDiskFailure.incr(); + scheduler.getServiceMetrics().containersDiskFailure.incr(); + break; + default: + break; + } + + // containersFailed include preempted, disks_failed etc. + componentMetrics.containersFailed.incr(); + scheduler.getServiceMetrics().containersFailed.incr(); + + // dec running container + decRunningContainers(); + + if (Apps.shouldCountTowardsNodeBlacklisting(status.getExitStatus())) { + String host = scheduler.getLiveInstances().get(status.getContainerId()) + .getNodeId().getHost(); + failureTracker.incNodeFailure(host); + currentContainerFailure.getAndIncrement() ; + } + } + + public boolean areDependenciesReady() { + List dependencies = componentSpec.getDependencies(); + if (SliderUtils.isEmpty(dependencies)) { + return true; + } + for (String dependency : dependencies) { + Component dependentComponent = + scheduler.getAllComponents().get(dependency); + if (dependentComponent == null) { + LOG.error("Couldn't find dependency {} for {} (should never happen)", + dependency, getName()); + continue; + } + if (dependentComponent.getNumReadyInstances() < dependentComponent + .getNumDesiredInstances()) { + LOG.info("[COMPONENT {}]: Dependency {} not satisfied, only {} of {}" + + " instances are ready.", getName(), dependency, + dependentComponent.getNumReadyInstances(), + dependentComponent.getNumDesiredInstances()); + return false; + } + } + return true; + } + + private void incRunningContainers() { + componentMetrics.containersRunning.incr(); + scheduler.getServiceMetrics().containersRunning.incr(); + } + + public void incContainersReady() { + componentMetrics.containersReady.incr(); + } + + public void decContainersReady() { + componentMetrics.containersReady.decr(); + } + + private void decRunningContainers() { + componentMetrics.containersRunning.decr(); + scheduler.getServiceMetrics().containersRunning.decr(); + } + + public int getNumReadyInstances() { + return componentMetrics.containersReady.value(); + } + + public int getNumRunningInstances() { + return componentMetrics.containersRunning.value(); + } + + public int getNumDesiredInstances() { + return componentMetrics.containersDesired.value(); + } + + public Map getAllComponentInstances() { + return compInstances; + } + + public org.apache.hadoop.yarn.service.api.records.Component getComponentSpec() { + return this.componentSpec; + } + + public void resetCompFailureCount() { + LOG.info("[COMPONENT {}]: Reset container failure count from {} to 0.", + getName(), currentContainerFailure.get()); + currentContainerFailure.set(0); + failureTracker.resetContainerFailures(); + } + + public Probe getProbe() { + return probe; + } + + public Priority getPriority() { + return priority; + } + + public long getAllocateId() { + return allocateId; + } + + public String getName () { + return componentSpec.getName(); + } + + public ComponentState getState() { + this.readLock.lock(); + + try { + return this.stateMachine.getCurrentState(); + } finally { + this.readLock.unlock(); + } + } + public ServiceScheduler getScheduler() { + return scheduler; + } + + @Override + public void handle(ComponentEvent event) { + try { + writeLock.lock(); + ComponentState oldState = getState(); + try { + stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitionException e) { + LOG.error(MessageFormat.format("[COMPONENT {0}]: Invalid event {1} at {2}", + componentSpec.getName(), event.getType(), oldState), e); + } + if (oldState != getState()) { + LOG.info("[COMPONENT {}] Transitioned from {} to {} on {} event.", + componentSpec.getName(), oldState, getState(), event.getType()); + } + } finally { + writeLock.unlock(); + } + } + + private static class BaseTransition implements + SingleArcTransition { + + @Override public void transition(Component component, + ComponentEvent event) { + } + } + + public ServiceContext getContext() { + return context; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java new file mode 100644 index 0000000..d93dcf1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.component; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; + +public class ComponentEvent extends AbstractEvent { + private long desired; + private final String name; + private final ComponentEventType type; + private Container container; + private ComponentInstance instance; + private ContainerStatus status; + + public ComponentEvent(String name, ComponentEventType type) { + super(type); + this.name = name; + this.type = type; + } + + public String getName() { + return name; + } + + public ComponentEventType getType() { + return type; + } + + public long getDesired() { + return desired; + } + + public ComponentEvent setDesired(long desired) { + this.desired = desired; + return this; + } + + public Container getContainer() { + return container; + } + + public ComponentEvent setContainer(Container container) { + this.container = container; + return this; + } + + public ComponentInstance getInstance() { + return instance; + } + + public ComponentEvent setInstance(ComponentInstance instance) { + this.instance = instance; + return this; + } + + public ContainerStatus getStatus() { + return status; + } + + public ComponentEvent setStatus(ContainerStatus status) { + this.status = status; + return this; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java new file mode 100644 index 0000000..6729699 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.component; + +public enum ComponentEventType { + FLEX, + CONTAINER_ALLOCATED, + CONTAINER_STARTED, + CONTAINER_COMPLETED +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java new file mode 100644 index 0000000..a5f9ff4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.component; + +public enum ComponentState { + INIT, + FLEXING, + STABLE +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java new file mode 100644 index 0000000..7d6525b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -0,0 +1,493 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.component.instance; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; +import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.service.ServiceScheduler; +import org.apache.hadoop.yarn.service.api.records.ContainerState; +import org.apache.hadoop.yarn.service.component.Component; +import org.apache.hadoop.yarn.state.InvalidStateTransitionException; +import org.apache.hadoop.yarn.state.SingleArcTransition; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.BoundedAppender; +import org.apache.hadoop.yarn.service.utils.SliderUtils; +import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; +import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus; +import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.text.MessageFormat; +import java.util.Date; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER; +import static org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE; +import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*; +import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*; + +public class ComponentInstance implements EventHandler, + Comparable { + private static final Logger LOG = + LoggerFactory.getLogger(ComponentInstance.class); + + private StateMachine stateMachine; + private Component component; + private final ReadLock readLock; + private final WriteLock writeLock; + + private ComponentInstanceId compInstanceId = null; + private Path compInstanceDir; + private Container container; + private YarnRegistryViewForProviders yarnRegistryOperations; + private FileSystem fs; + private boolean timelineServiceEnabled = false; + private ServiceTimelinePublisher serviceTimelinePublisher; + private ServiceScheduler scheduler; + private BoundedAppender diagnostics = new BoundedAppender(64 * 1024); + private volatile ScheduledFuture containerStatusFuture; + private volatile ContainerStatus status; + private long containerStartedTime = 0; + // This container object is used for rest API query + private org.apache.hadoop.yarn.service.api.records.Container containerSpec; + + private static final StateMachineFactory + stateMachineFactory = + new StateMachineFactory(INIT) + .addTransition(INIT, STARTED, START, + new ContainerStartedTransition()) + + //From Running + .addTransition(STARTED, INIT, STOP, + new ContainerStoppedTransition()) + .addTransition(STARTED, READY, BECOME_READY, + new ContainerBecomeReadyTransition()) + + // FROM READY + .addTransition(READY, STARTED, BECOME_NOT_READY, + new ContainerBecomeNotReadyTransition()) + .addTransition(READY, INIT, STOP, new ContainerStoppedTransition()) + .installTopology(); + + + + public ComponentInstance(Component component, + ComponentInstanceId compInstanceId) { + this.stateMachine = stateMachineFactory.make(this); + this.component = component; + this.compInstanceId = compInstanceId; + this.scheduler = component.getScheduler(); + this.yarnRegistryOperations = + component.getScheduler().getYarnRegistryOperations(); + this.serviceTimelinePublisher = + component.getScheduler().getServiceTimelinePublisher(); + if (YarnConfiguration + .timelineServiceV2Enabled(component.getScheduler().getConfig())) { + this.timelineServiceEnabled = true; + } + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.readLock = lock.readLock(); + this.writeLock = lock.writeLock(); + this.fs = scheduler.getContext().fs.getFileSystem(); + } + + private static class ContainerStartedTransition extends BaseTransition { + @Override public void transition(ComponentInstance compInstance, + ComponentInstanceEvent event) { + // Query container status for ip and host + compInstance.containerStatusFuture = + compInstance.scheduler.executorService.scheduleAtFixedRate( + new ContainerStatusRetriever(compInstance.scheduler, + compInstance.getContainerId(), compInstance), 0, 1, + TimeUnit.SECONDS); + + org.apache.hadoop.yarn.service.api.records.Container container = + new org.apache.hadoop.yarn.service.api.records.Container(); + container.setId(compInstance.getContainerId().toString()); + container.setLaunchTime(new Date()); + container.setState(ContainerState.RUNNING_BUT_UNREADY); + container.setBareHost(compInstance.container.getNodeId().getHost()); + container.setComponentName(compInstance.getCompInstanceName()); + if (compInstance.containerSpec != null) { + // remove the previous container. + compInstance.getCompSpec().removeContainer(compInstance.containerSpec); + } + compInstance.containerSpec = container; + compInstance.getCompSpec().addContainer(container); + compInstance.containerStartedTime = System.currentTimeMillis(); + + if (compInstance.timelineServiceEnabled) { + compInstance.serviceTimelinePublisher + .componentInstanceStarted(container, compInstance); + } + } + } + + private static class ContainerBecomeReadyTransition extends BaseTransition { + @Override + public void transition(ComponentInstance compInstance, + ComponentInstanceEvent event) { + compInstance.component.incContainersReady(); + compInstance.containerSpec.setState(ContainerState.READY); + } + } + + private static class ContainerBecomeNotReadyTransition extends BaseTransition { + @Override + public void transition(ComponentInstance compInstance, + ComponentInstanceEvent event) { + compInstance.component.decContainersReady(); + compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY); + } + } + + private static class ContainerStoppedTransition extends BaseTransition { + @Override + public void transition(ComponentInstance compInstance, + ComponentInstanceEvent event) { + // re-ask the failed container. + Component comp = compInstance.component; + comp.requestContainers(1); + LOG.info(compInstance.getCompInstanceId() + + ": Container completed. Requested a new container." + System + .lineSeparator() + " exitStatus={}, diagnostics={}.", + event.getStatus().getExitStatus(), + event.getStatus().getDiagnostics()); + String containerDiag = + compInstance.getCompInstanceId() + ": " + event.getStatus() + .getDiagnostics(); + compInstance.diagnostics.append(containerDiag + System.lineSeparator()); + + boolean shouldExit = false; + // check if it exceeds the failure threshold + if (comp.currentContainerFailure.get() > comp.maxContainerFailurePerComp) { + String exitDiag = MessageFormat.format( + "[COMPONENT {0}]: Failed {1} times, exceeded the limit - {2}. Shutting down now... " + + System.lineSeparator(), + comp.getName(), comp.currentContainerFailure.get(), comp.maxContainerFailurePerComp); + compInstance.diagnostics.append(exitDiag); + // append to global diagnostics that will be reported to RM. + comp.getScheduler().getDiagnostics().append(containerDiag); + comp.getScheduler().getDiagnostics().append(exitDiag); + LOG.warn(exitDiag); + shouldExit = true; + } + + // clean up registry + // hdfs dir content will be overwritten when a new container gets started, + // so no need remove. + compInstance.scheduler.executorService + .submit(compInstance::cleanupRegistry); + + // remove the failed ContainerId -> CompInstance mapping + comp.getScheduler().removeLiveCompInstance(event.getContainerId()); + + if (compInstance.timelineServiceEnabled) { + // record in ATS + compInstance.serviceTimelinePublisher + .componentInstanceFinished(compInstance, + event.getStatus().getExitStatus(), event.getStatus().getState(), + containerDiag); + } + + compInstance.containerSpec.setState(ContainerState.STOPPED); + if (shouldExit) { + // Sleep for 5 seconds in hope that the state can be recorded in ATS. + // in case there's a client polling the comp state, it can be notified. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + LOG.error("Interrupted on sleep while exiting.", e); + } + ExitUtil.terminate(-1); + } + } + } + + public ComponentInstanceState getState() { + this.readLock.lock(); + + try { + return this.stateMachine.getCurrentState(); + } finally { + this.readLock.unlock(); + } + } + + @Override + public void handle(ComponentInstanceEvent event) { + try { + writeLock.lock(); + ComponentInstanceState oldState = getState(); + try { + stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitionException e) { + LOG.error(getCompInstanceId() + ": Invalid event " + event.getType() + + " at " + oldState, e); + } + if (oldState != getState()) { + LOG.info(getCompInstanceId() + " Transitioned from " + oldState + " to " + + getState() + " on " + event.getType() + " event"); + } + } finally { + writeLock.unlock(); + } + } + + public void setContainer(Container container) { + this.container = container; + this.compInstanceId.setContainerId(container.getId()); + } + + public String getCompInstanceName() { + return compInstanceId.getCompInstanceName(); + } + + public ContainerStatus getContainerStatus() { + return status; + } + + public void updateContainerStatus(ContainerStatus status) { + this.status = status; + org.apache.hadoop.yarn.service.api.records.Container container = + getCompSpec().getContainer(getContainerId().toString()); + if (container != null) { + container.setIp(StringUtils.join(",", status.getIPs())); + container.setHostname(status.getHost()); + if (timelineServiceEnabled) { + serviceTimelinePublisher.componentInstanceUpdated(container); + } + } + updateServiceRecord(yarnRegistryOperations, status); + } + + public ContainerId getContainerId() { + return container.getId(); + } + + public String getCompName() { + return compInstanceId.getCompName(); + } + + public void setCompInstanceDir(Path dir) { + this.compInstanceDir = dir; + } + + public Component getComponent() { + return component; + } + + public Container getContainer() { + return container; + } + + public ComponentInstanceId getCompInstanceId() { + return compInstanceId; + } + + public NodeId getNodeId() { + return this.container.getNodeId(); + } + + public org.apache.hadoop.yarn.service.api.records.Component getCompSpec() { + return component.getComponentSpec(); + } + + private static class BaseTransition implements + SingleArcTransition { + + @Override public void transition(ComponentInstance compInstance, + ComponentInstanceEvent event) { + } + } + + public ProbeStatus ping() { + if (component.getProbe() == null) { + ProbeStatus status = new ProbeStatus(); + status.setSuccess(true); + return status; + } + return component.getProbe().ping(this); + } + + // Write service record into registry + private void updateServiceRecord( + YarnRegistryViewForProviders yarnRegistry, ContainerStatus status) { + ServiceRecord record = new ServiceRecord(); + String containerId = status.getContainerId().toString(); + record.set(YarnRegistryAttributes.YARN_ID, containerId); + record.description = getCompInstanceName(); + record.set(YarnRegistryAttributes.YARN_PERSISTENCE, + PersistencePolicies.CONTAINER); + record.set("yarn:ip", status.getIPs()); + record.set("yarn:hostname", status.getHost()); + try { + yarnRegistry + .putComponent(RegistryPathUtils.encodeYarnID(containerId), record); + } catch (IOException e) { + LOG.error( + "Failed to update service record in registry: " + containerId + ""); + } + } + + // Release the container , cleanup registry, hdfs dir, and record in ATS + public void destroy() { + LOG.info(getCompInstanceId() + ": Flexed down by user, destroying."); + diagnostics.append(getCompInstanceId() + ": Flexed down by user"); + if (container != null) { + scheduler.removeLiveCompInstance(container.getId()); + component.getScheduler().getAmRMClient() + .releaseAssignedContainer(container.getId()); + getCompSpec().removeContainer(containerSpec); + } + if (timelineServiceEnabled) { + serviceTimelinePublisher + .componentInstanceFinished(this, KILLED_BY_APPMASTER, COMPLETE, + diagnostics.toString()); + } + scheduler.executorService.submit(this::cleanupRegistryAndCompHdfsDir); + } + + private void cleanupRegistry() { + ContainerId containerId = getContainerId(); + String cid = RegistryPathUtils.encodeYarnID(containerId.toString()); + try { + yarnRegistryOperations.deleteComponent(getCompInstanceId(), cid); + } catch (IOException e) { + LOG.error(getCompInstanceId() + ": Failed to delete registry", e); + } + } + + //TODO Maybe have a dedicated cleanup service. + public void cleanupRegistryAndCompHdfsDir() { + cleanupRegistry(); + try { + if (compInstanceDir != null && fs.exists(compInstanceDir)) { + boolean deleted = fs.delete(compInstanceDir, true); + if (!deleted) { + LOG.error(getCompInstanceId() + + ": Failed to delete component instance dir: " + + compInstanceDir); + } else { + LOG.info(getCompInstanceId() + ": Deleted component instance dir: " + + compInstanceDir); + } + } + } catch (IOException e) { + LOG.warn(getCompInstanceId() + ": Failed to delete directory", e); + } + } + + // Query container status until ip and hostname are available and update + // the service record into registry service + private static class ContainerStatusRetriever implements Runnable { + private ContainerId containerId; + private NodeId nodeId; + private NMClient nmClient; + private ComponentInstance instance; + ContainerStatusRetriever(ServiceScheduler scheduler, + ContainerId containerId, ComponentInstance instance) { + this.containerId = containerId; + this.nodeId = instance.getNodeId(); + this.nmClient = scheduler.getNmClient().getClient(); + this.instance = instance; + } + @Override public void run() { + ContainerStatus status = null; + try { + status = nmClient.getContainerStatus(containerId, nodeId); + } catch (Exception e) { + if (e instanceof YarnException) { + throw new YarnRuntimeException( + instance.compInstanceId + " Failed to get container status on " + + nodeId + " , cancelling.", e); + } + LOG.error(instance.compInstanceId + " Failed to get container status on " + + nodeId + ", will try again", e); + return; + } + if (SliderUtils.isEmpty(status.getIPs()) || SliderUtils + .isUnset(status.getHost())) { + return; + } + instance.updateContainerStatus(status); + LOG.info( + instance.compInstanceId + " IP = " + status.getIPs() + ", host = " + + status.getHost() + ", cancel container status retriever"); + instance.containerStatusFuture.cancel(false); + } + } + + @Override + public int compareTo(ComponentInstance to) { + long delta = containerStartedTime - to.containerStartedTime; + if (delta == 0) { + return getCompInstanceId().compareTo(to.getCompInstanceId()); + } else if (delta < 0) { + return -1; + } else { + return 1; + } + } + + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + ComponentInstance instance = (ComponentInstance) o; + + if (containerStartedTime != instance.containerStartedTime) + return false; + return compInstanceId.equals(instance.compInstanceId); + } + + @Override public int hashCode() { + int result = compInstanceId.hashCode(); + result = 31 * result + (int) (containerStartedTime ^ (containerStartedTime + >>> 32)); + return result; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEvent.java new file mode 100644 index 0000000..707b034 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEvent.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.component.instance; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class ComponentInstanceEvent + extends AbstractEvent { + + private ContainerId id; + private ContainerStatus status; + private boolean shouldDestroy = false; + + public ComponentInstanceEvent(ContainerId containerId, + ComponentInstanceEventType componentInstanceEventType) { + super(componentInstanceEventType); + this.id = containerId; + } + + public ContainerId getContainerId() { + return id; + } + + public ContainerStatus getStatus() { + return this.status; + } + + public ComponentInstanceEvent setStatus(ContainerStatus status) { + this.status = status; + return this; + } + + public void setShouldDestroy() { + shouldDestroy = true; + } + + public boolean shouldDestroy() { + return shouldDestroy; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java new file mode 100644 index 0000000..1a880ba --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.component.instance; + +public enum ComponentInstanceEventType { + START, + STOP, + BECOME_READY, + BECOME_NOT_READY +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceId.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceId.java new file mode 100644 index 0000000..14387ba --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceId.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.component.instance; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +public class ComponentInstanceId implements Comparable { + + private long Id; + private String name; + private ContainerId containerId; + + public ComponentInstanceId(long id, String name) { + Id = id; + this.name = name; + } + + public long getId() { + return Id; + } + + public String getCompName() { + return name; + } + + public String getCompInstanceName() { + return getCompName() + "-" + getId(); + } + + public void setContainerId(ContainerId containerId) { + this.containerId = containerId; + } + + @Override + public String toString() { + if (containerId == null) { + return "[COMPINSTANCE " + getCompInstanceName() + "]"; + } else { + return "[COMPINSTANCE " + getCompInstanceName() + " : " + containerId + "]"; + } + } + + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + ComponentInstanceId that = (ComponentInstanceId) o; + + if (getId() != that.getId()) + return false; + return getCompName() != null ? getCompName().equals(that.getCompName()) : + that.getCompName() == null; + + } + + @Override public int hashCode() { + int result = (int) (getId() ^ (getId() >>> 32)); + result = 31 * result + (getCompName() != null ? getCompName().hashCode() : 0); + return result; + } + + @Override + public int compareTo(ComponentInstanceId to) { + int delta = this.getCompName().compareTo(to.getCompName()); + if (delta == 0) { + return Long.compare(this.getId(), to.getId()); + } else if (delta < 0) { + return -1; + } else { + return 1; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java new file mode 100644 index 0000000..f5de5cb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.component.instance; + +public enum ComponentInstanceState { + INIT, + STARTED, + READY, + UPGRADING +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java new file mode 100644 index 0000000..6de2dc0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.conf; + +public interface RestApiConstants { + + // Rest endpoints + String CONTEXT_ROOT = "/ws/v1"; + String VERSION = "/services/version"; + String SERVICE_ROOT_PATH = "/services"; + String SERVICE_PATH = "/services/{service_name}"; + String COMPONENT_PATH = "/services/{service_name}/components/{component_name}"; + + // Query param + String SERVICE_NAME = "service_name"; + String COMPONENT_NAME = "component_name"; + + String DEFAULT_COMPONENT_NAME = "default"; + + String PROPERTY_REST_SERVICE_HOST = "REST_SERVICE_HOST"; + String PROPERTY_REST_SERVICE_PORT = "REST_SERVICE_PORT"; + Long DEFAULT_UNLIMITED_LIFETIME = -1l; + + Integer ERROR_CODE_APP_DOES_NOT_EXIST = 404001; + Integer ERROR_CODE_APP_IS_NOT_RUNNING = 404002; + Integer ERROR_CODE_APP_SUBMITTED_BUT_NOT_RUNNING_YET = 404003; + Integer ERROR_CODE_APP_NAME_INVALID = 404004; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/SliderExitCodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/SliderExitCodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/SliderExitCodes.java new file mode 100644 index 0000000..ee270cb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/SliderExitCodes.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.conf; + +import org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes; + +public interface SliderExitCodes extends LauncherExitCodes { + + /** + * starting point for exit codes; not an exception itself + */ + int _EXIT_CODE_BASE = 64; + + /** + * service entered the failed state: {@value} + */ + int EXIT_YARN_SERVICE_FAILED = 65; + + /** + * service was killed: {@value} + */ + int EXIT_YARN_SERVICE_KILLED = 66; + + /** + * timeout on monitoring client: {@value} + */ + int EXIT_TIMED_OUT = 67; + + /** + * service finished with an error: {@value} + */ + int EXIT_YARN_SERVICE_FINISHED_WITH_ERROR = 68; + + /** + * the service instance is unknown: {@value} + */ + int EXIT_UNKNOWN_INSTANCE = 69; + + /** + * the service instance is in the wrong state for that operation: {@value} + */ + int EXIT_BAD_STATE = 70; + + /** + * A spawned master process failed + */ + int EXIT_PROCESS_FAILED = 71; + + /** + * The instance failed -too many containers were + * failing or some other threshold was reached + */ + int EXIT_DEPLOYMENT_FAILED = 72; + + /** + * The service is live -and the requested operation + * does not work if the cluster is running + */ + int EXIT_APPLICATION_IN_USE = 73; + + /** + * There already is an service instance of that name + * when an attempt is made to create a new instance + */ + int EXIT_INSTANCE_EXISTS = 75; + + /** + * Exit code when the configurations in valid/incomplete: {@value} + */ + int EXIT_BAD_CONFIGURATION = 77; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java new file mode 100644 index 0000000..1968e95 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.conf; + +import org.apache.hadoop.yarn.service.api.records.Configuration; + +public class YarnServiceConf { + + // Retry settings for the ServiceClient to talk to Service AppMaster + public static final String CLIENT_AM_RETRY_MAX_WAIT_MS = "yarn.service.client-am.retry.max-wait-ms"; + public static final String CLIENT_AM_RETRY_MAX_INTERVAL_MS = "yarn.service.client-am.retry-interval-ms"; + + // Retry settings for container failures + public static final String CONTAINER_RETRY_MAX = "yarn.service.container-failure.retry.max"; + public static final String CONTAINER_RETRY_INTERVAL = "yarn.service.container-failure.retry-interval"; + + public static final String AM_RESTART_MAX = "yarn.service.am-restart.max-attempts"; + public static final String AM_RESOURCE_MEM = "yarn.service.am-resource.memory"; + public static final long DEFAULT_KEY_AM_RESOURCE_MEM = 1024; + + public static final String YARN_QUEUE = "yarn.service.queue"; + + public static final String API_SERVER_ADDRESS = "yarn.service.api-server.address"; + public static final String DEFAULT_API_SERVER_ADDRESS = "0.0.0.0:"; + public static final int DEFAULT_API_SERVER_PORT = 9191; + + /** + * The yarn service base path: + * Defaults to HomeDir/.yarn/ + */ + public static final String YARN_SERVICE_BASE_PATH = "yarn.service.base.path"; + + //TODO rename + /** Declare that a keytab must be provided */ + public static final String KEY_AM_LOGIN_KEYTAB_REQUIRED = "slider.am.login.keytab.required"; + public static final String KEY_AM_LOGIN_KEYTAB_NAME = "slider.am.login.keytab.name"; + public static final String KEY_HDFS_KEYTAB_DIR = "slider.hdfs.keytab.dir"; + public static final String KEY_AM_KEYTAB_LOCAL_PATH = "slider.am.keytab.local.path"; + + /** + * maximum number of failed containers (in a single component) + * before the app exits + */ + public static final String CONTAINER_FAILURE_THRESHOLD = + "yarn.service.container-failure-per-component.threshold"; + /** + * Maximum number of container failures on a node before the node is blacklisted + */ + public static final String NODE_BLACKLIST_THRESHOLD = + "yarn.service.node-blacklist.threshold"; + + /** + * The failure count for CONTAINER_FAILURE_THRESHOLD and NODE_BLACKLIST_THRESHOLD + * gets reset periodically, the unit is seconds. + */ + public static final String CONTAINER_FAILURE_WINDOW = + "yarn.service.failure-count-reset.window"; + + /** + * interval between readiness checks. + */ + public static final String READINESS_CHECK_INTERVAL = "yarn.service.readiness-check-interval.seconds"; + public static final int DEFAULT_READINESS_CHECK_INTERVAL = 30; // seconds + + /** + * Get long value for the property. First get from the userConf, if not + * present, get from systemConf. + * + * @param name name of the property + * @param defaultValue default value of the property, if it is not defined in + * userConf and systemConf. + * @param userConf Configuration provided by client in the JSON definition + * @param systemConf The YarnConfiguration in the system. + * @return long value for the property + */ + public static long getLong(String name, long defaultValue, + Configuration userConf, org.apache.hadoop.conf.Configuration systemConf) { + return userConf.getPropertyLong(name, systemConf.getLong(name, defaultValue)); + } + + public static int getInt(String name, int defaultValue, + Configuration userConf, org.apache.hadoop.conf.Configuration systemConf) { + return userConf.getPropertyInt(name, systemConf.getInt(name, defaultValue)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java new file mode 100644 index 0000000..e5ed703 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.conf; + +public interface YarnServiceConstants { + + /** + * The path under which cluster and temp data are stored + */ + String SERVICE_BASE_DIRECTORY = ".yarn"; + + /** + * The paths under which Service AM dependency libraries are stored + */ + String DEPENDENCY_LOCALIZED_DIR_LINK = "service_dep"; + String DEPENDENCY_DIR = "/yarn-services/%s/"; + String DEPENDENCY_TAR_GZ_FILE_NAME = "service-dep"; + String DEPENDENCY_TAR_GZ_FILE_EXT = ".tar.gz"; + String DEPENDENCY_DIR_PERMISSIONS = "755"; + + /** + * Service type for YARN service + */ + String APP_TYPE = "yarn-service"; + + String KEYTAB_DIR = "keytabs"; + String RESOURCE_DIR = "resources"; + + + String SERVICES_DIRECTORY = "services"; + + /** + * JVM property to define the service lib directory; + * this is set by the yarn.sh script + */ + String PROPERTY_LIB_DIR = "service.libdir"; + + /** + * name of generated dir for this conf + */ + String SUBMITTED_CONF_DIR = "conf"; + + /** + * Service AM log4j file name + */ + String YARN_SERVICE_LOG4J_FILENAME = "yarnservice-log4j.properties"; + + /** + * Log4j sysprop to name the resource + */ + String SYSPROP_LOG4J_CONFIGURATION = "log4j.configuration"; + + /** + * sysprop for Service AM log4j directory + */ + String SYSPROP_LOG_DIR = "LOG_DIR"; + + String TMP_DIR_PREFIX = "tmp"; + + + String SERVICE_CORE_JAR = "yarn-service-core.jar"; + + String STDOUT_AM = "serviceam-out.txt"; + String STDERR_AM = "serviceam-err.txt"; + + String HADOOP_USER_NAME = "HADOOP_USER_NAME"; + + String APP_CONF_DIR = "conf"; + + String APP_LIB_DIR = "lib"; + + String OUT_FILE = "stdout.txt"; + String ERR_FILE = "stderr.txt"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java new file mode 100644 index 0000000..e4eae20 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.containerlaunch; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; +import org.apache.hadoop.yarn.service.utils.CoreFileSystem; +import org.apache.hadoop.yarn.service.utils.SliderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import static org.apache.hadoop.yarn.service.provider.docker.DockerKeys.DEFAULT_DOCKER_NETWORK; + +/** + * Launcher of applications: base class + */ +public class AbstractLauncher { + private static final Logger log = + LoggerFactory.getLogger(AbstractLauncher.class); + public static final String CLASSPATH = "CLASSPATH"; + /** + * Filesystem to use for the launch + */ + protected final CoreFileSystem coreFileSystem; + /** + * Env vars; set up at final launch stage + */ + protected final Map envVars = new HashMap<>(); + protected final ContainerLaunchContext containerLaunchContext = + Records.newRecord(ContainerLaunchContext.class); + protected final List commands = new ArrayList<>(20); + protected final Map localResources = new HashMap<>(); + protected final Map mountPaths = new HashMap<>(); + private final Map serviceData = new HashMap<>(); + // security + protected final Credentials credentials; + protected boolean yarnDockerMode = false; + protected String dockerImage; + protected String dockerNetwork = DEFAULT_DOCKER_NETWORK; + protected String dockerHostname; + protected String runPrivilegedContainer; + + + /** + * Create instance. + * @param coreFileSystem filesystem + * @param credentials initial set of credentials -null is permitted + */ + public AbstractLauncher( + CoreFileSystem coreFileSystem, + Credentials credentials) { + this.coreFileSystem = coreFileSystem; + this.credentials = credentials != null ? credentials: new Credentials(); + } + + public void setYarnDockerMode(boolean yarnDockerMode){ + this.yarnDockerMode = yarnDockerMode; + } + + /** + * Get the env vars to work on + * @return env vars + */ + public Map getEnv() { + return envVars; + } + + /** + * Get the launch commands. + * @return the live list of commands + */ + public List getCommands() { + return commands; + } + + public void addLocalResource(String subPath, LocalResource resource) { + localResources.put(subPath, resource); + } + + public void addLocalResource(String subPath, LocalResource resource, String mountPath) { + localResources.put(subPath, resource); + mountPaths.put(subPath, mountPath); + } + + /** + * Accessor to the credentials + * @return the credentials associated with this launcher + */ + public Credentials getCredentials() { + return credentials; + } + + + public void addCommand(String cmd) { + commands.add(cmd); + } + + /** + * Complete the launch context (copy in env vars, etc). + * @return the container to launch + */ + public ContainerLaunchContext completeContainerLaunch() throws IOException { + + String cmdStr = SliderUtils.join(commands, " ", false); + log.debug("Completed setting up container command {}", cmdStr); + containerLaunchContext.setCommands(commands); + + //env variables + if (log.isDebugEnabled()) { + log.debug("Environment variables"); + for (Map.Entry envPair : envVars.entrySet()) { + log.debug(" \"{}\"=\"{}\"", envPair.getKey(), envPair.getValue()); + } + } + containerLaunchContext.setEnvironment(envVars); + + //service data + if (log.isDebugEnabled()) { + log.debug("Service Data size"); + for (Map.Entry entry : serviceData.entrySet()) { + log.debug("\"{}\"=> {} bytes of data", entry.getKey(), + entry.getValue().array().length); + } + } + containerLaunchContext.setServiceData(serviceData); + + // resources + dumpLocalResources(); + containerLaunchContext.setLocalResources(localResources); + + //tokens + log.debug("{} tokens", credentials.numberOfTokens()); + containerLaunchContext.setTokens(CredentialUtils.marshallCredentials( + credentials)); + + if(yarnDockerMode){ + Map env = containerLaunchContext.getEnvironment(); + env.put("YARN_CONTAINER_RUNTIME_TYPE", "docker"); + env.put("YARN_CONTAINER_RUNTIME_DOCKER_IMAGE", dockerImage); + env.put("YARN_CONTAINER_RUNTIME_DOCKER_CONTAINER_NETWORK", dockerNetwork); + env.put("YARN_CONTAINER_RUNTIME_DOCKER_CONTAINER_HOSTNAME", + dockerHostname); + env.put("YARN_CONTAINER_RUNTIME_DOCKER_RUN_PRIVILEGED_CONTAINER", runPrivilegedContainer); + StringBuilder sb = new StringBuilder(); + for (Entry mount : mountPaths.entrySet()) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append(mount.getKey()); + sb.append(":"); + sb.append(mount.getValue()); + } + env.put("YARN_CONTAINER_RUNTIME_DOCKER_LOCAL_RESOURCE_MOUNTS", sb.toString()); + log.info("yarn docker env var has been set {}", containerLaunchContext.getEnvironment().toString()); + } + + return containerLaunchContext; + } + + public void setRetryContext(int maxRetries, int retryInterval) { + ContainerRetryContext retryContext = ContainerRetryContext + .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, maxRetries, + retryInterval); + containerLaunchContext.setContainerRetryContext(retryContext); + } + + /** + * Dump local resources at debug level + */ + private void dumpLocalResources() { + if (log.isDebugEnabled()) { + log.debug("{} resources: ", localResources.size()); + for (Map.Entry entry : localResources.entrySet()) { + + String key = entry.getKey(); + LocalResource val = entry.getValue(); + log.debug(key + "=" + SliderUtils.stringify(val.getResource())); + } + } + } + + /** + * This is critical for an insecure cluster -it passes + * down the username to YARN, and so gives the code running + * in containers the rights it needs to work with + * data. + * @throws IOException problems working with current user + */ + protected void propagateUsernameInInsecureCluster() throws IOException { + //insecure cluster: propagate user name via env variable + String userName = UserGroupInformation.getCurrentUser().getUserName(); + envVars.put(YarnServiceConstants.HADOOP_USER_NAME, userName); + } + + /** + * Utility method to set up the classpath + * @param classpath classpath to use + */ + public void setClasspath(ClasspathConstructor classpath) { + setEnv(CLASSPATH, classpath.buildClasspath()); + } + + /** + * Set an environment variable in the launch context + * @param var variable name + * @param value value (must be non null) + */ + public void setEnv(String var, String value) { + Preconditions.checkArgument(var != null, "null variable name"); + Preconditions.checkArgument(value != null, "null value"); + envVars.put(var, value); + } + + + public void putEnv(Map map) { + envVars.putAll(map); + } + + + public void setDockerImage(String dockerImage) { + this.dockerImage = dockerImage; + } + + public void setDockerNetwork(String dockerNetwork) { + this.dockerNetwork = dockerNetwork; + } + + public void setDockerHostname(String dockerHostname) { + this.dockerHostname = dockerHostname; + } + + public void setRunPrivilegedContainer(boolean runPrivilegedContainer) { + if (runPrivilegedContainer) { + this.runPrivilegedContainer = Boolean.toString(true); + } else { + this.runPrivilegedContainer = Boolean.toString(false); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ae16ae9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ClasspathConstructor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ClasspathConstructor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ClasspathConstructor.java new file mode 100644 index 0000000..22b3877 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ClasspathConstructor.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.containerlaunch; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.service.utils.SliderUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * build a classpath -allows for entries to be injected in front of + * YARN classpath as well as behind, adds appropriate separators, + * extraction of local classpath, etc. + */ +public class ClasspathConstructor { + + public static final String CLASS_PATH_SEPARATOR = ApplicationConstants.CLASS_PATH_SEPARATOR; + private final List pathElements = new ArrayList<>(); + + public ClasspathConstructor() { + } + + + /** + * Get the list of JARs from the YARN settings + * @param config configuration + */ + public List yarnApplicationClasspath(Configuration config) { + String[] cp = config.getTrimmedStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH); + return cp != null ? Arrays.asList(cp) : new ArrayList(0); + + } + + + @Override + public String toString() { + return buildClasspath(); + } + + public String buildClasspath() { + return SliderUtils.join(pathElements, + CLASS_PATH_SEPARATOR, + false); + } + + /** + * Get a copy of the path list + * @return the JARs + */ + public List getPathElements() { + return Collections.unmodifiableList(pathElements); + } + + /** + * Append an entry + * @param path path + */ + public void append(String path) { + pathElements.add(path); + } + + /** + * Insert a path at the front of the list. This places it ahead of + * the standard YARN artifacts + * @param path path to the JAR. Absolute or relative -on the target + * system + */ + public void insert(String path) { + pathElements.add(0, path); + } + + public void appendAll(Collection paths) { + pathElements.addAll(paths); + } + + public void insertAll(Collection paths) { + pathElements.addAll(0, paths); + } + + + public void addLibDir(String pathToLibDir) { + append(buildLibDir(pathToLibDir)); + } + + public void insertLibDir(String pathToLibDir) { + insert(buildLibDir(pathToLibDir)); + } + + public void addClassDirectory(String pathToDir) { + append(appendDirectoryTerminator(pathToDir)); + } + + public void insertClassDirectory(String pathToDir) { + insert(buildLibDir(appendDirectoryTerminator(pathToDir))); + } + + + public void addRemoteClasspathEnvVar() { + append(ApplicationConstants.Environment.CLASSPATH.$$()); + } + + + public void insertRemoteClasspathEnvVar() { + append(ApplicationConstants.Environment.CLASSPATH.$$()); + } + + + /** + * Build a lib dir path + * @param pathToLibDir path to the directory; may or may not end with a + * trailing space + * @return a path to a lib dir that is compatible with the java classpath + */ + public String buildLibDir(String pathToLibDir) { + String dir = appendDirectoryTerminator(pathToLibDir); + dir += "*"; + return dir; + } + + private String appendDirectoryTerminator(String pathToLibDir) { + String dir = pathToLibDir.trim(); + if (!dir.endsWith("/")) { + dir += "/"; + } + return dir; + } + + /** + * Split a classpath. This uses the local path separator so MUST NOT + * be used to work with remote classpaths + * @param localpath local path + * @return a splite + */ + public Collection splitClasspath(String localpath) { + String separator = System.getProperty("path.separator"); + return StringUtils.getStringCollection(localpath, separator); + } + + /** + * Get the local JVM classpath split up + * @return the list of entries on the JVM classpath env var + */ + public Collection localJVMClasspath() { + return splitClasspath(System.getProperty("java.class.path")); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org