Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2A46910F8F for ; Thu, 20 Feb 2014 02:58:38 +0000 (UTC) Received: (qmail 77224 invoked by uid 500); 20 Feb 2014 02:58:37 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 77187 invoked by uid 500); 20 Feb 2014 02:58:36 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 77180 invoked by uid 99); 20 Feb 2014 02:58:35 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Feb 2014 02:58:35 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 907AD928A0C; Thu, 20 Feb 2014 02:58:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kanak@apache.org To: commits@helix.apache.org Message-Id: <5759f22c5f494ff2bdca45f281e1628e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: Made container states more consistent, changed yarn target provider logic Date: Thu, 20 Feb 2014 02:58:35 +0000 (UTC) Repository: helix Updated Branches: refs/heads/helix-provisioning cb6aa4fa0 -> 57b4b180e Made container states more consistent, changed yarn target provider logic Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/57b4b180 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/57b4b180 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/57b4b180 Branch: refs/heads/helix-provisioning Commit: 57b4b180e0c0b7f3ae0c21191af1f72bca61732f Parents: cb6aa4f Author: Kanak Biscuitwala Authored: Wed Feb 19 18:58:00 2014 -0800 Committer: Kanak Biscuitwala Committed: Wed Feb 19 18:58:00 2014 -0800 ---------------------------------------------------------------------- .../controller/provisioner/ContainerState.java | 5 +- .../stages/ContainerProvisioningStage.java | 9 ++- .../integration/TestLocalContainerProvider.java | 4 +- .../provisioning/yarn/YarnProvisioner.java | 83 ++++++++++++-------- 4 files changed, 61 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java index cf4b736..449f636 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java +++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java @@ -23,8 +23,9 @@ public enum ContainerState { ACQUIRING, ACQUIRED, CONNECTING, - ACTIVE, - TEARDOWN, + CONNECTED, + DISCONNECTED, + HALTING, HALTED, FINALIZING, FINALIZED, http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java index 48166bf..42c8218 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java @@ -22,7 +22,6 @@ package org.apache.helix.controller.stages; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.UUID; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; @@ -166,12 +165,13 @@ public class ContainerProvisioningStage extends AbstractBaseStage { accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); // create the helix participant and add it to cluster - ListenableFuture future = containerProvider.startContainer(containerId, participant); + ListenableFuture future = + containerProvider.startContainer(containerId, participant); FutureCallback callback = new FutureCallback() { @Override public void onSuccess(Boolean result) { updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(), - ContainerState.ACTIVE); + ContainerState.CONNECTED); } @Override @@ -225,7 +225,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage { .toString()); final ContainerId containerId = existingInstance.getContainerId(); existingInstance.setInstanceEnabled(false); - existingInstance.setContainerState(ContainerState.TEARDOWN); + existingInstance.setContainerState(ContainerState.HALTING); accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); // stop the container @@ -267,6 +267,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage { InstanceConfig existingInstance = helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString()); existingInstance.setContainerState(state); + existingInstance.setInstanceEnabled(state.equals(ContainerState.CONNECTED)); accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance); } http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java index 0e4c803..0f7be64 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java @@ -250,7 +250,7 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { participantService.startAsync(); participantService.awaitRunning(); _participants.put(containerId, participantService); - _states.put(containerId, ContainerState.ACTIVE); + _states.put(containerId, ContainerState.CONNECTED); started++; SettableFuture future = SettableFuture.create(); future.set(true); @@ -294,7 +294,7 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { // acquired containers are ready to start containersToStart.add(participant); break; - case ACTIVE: + case CONNECTED: // stop at most two active at a time, wait for everything to be up first if (stopCount < 2 && _askCount >= MAX_PARTICIPANTS) { containersToStop.add(participant); http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java index 4fcc219..daac87b 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java @@ -1,10 +1,6 @@ package org.apache.helix.provisioning.yarn; import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -13,22 +9,13 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.Vector; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.Executors; -import org.apache.commons.compress.archivers.ArchiveStreamFactory; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.Container; @@ -55,13 +42,13 @@ import org.apache.helix.controller.provisioner.ContainerProvider; import org.apache.helix.controller.provisioner.ContainerSpec; import org.apache.helix.controller.provisioner.ContainerState; import org.apache.helix.controller.provisioner.Provisioner; -import org.apache.helix.controller.provisioner.ProvisionerConfig; import org.apache.helix.controller.provisioner.TargetProvider; import org.apache.helix.controller.provisioner.TargetProviderResponse; import org.apache.helix.model.InstanceConfig; -import com.google.common.collect.Lists; import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -274,39 +261,57 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr .getProvisionerConfig(); int targetNumContainers = provisionerConfig.getNumContainers(); + // Any container that is in a state should be put in this set Set existingContainersIdSet = new HashSet(); - + + // Cache halted containers to determine which to restart and which to release + Map excessHaltedContainers = Maps.newHashMap(); + + // Cache participants to ensure that excess participants are stopped + Map excessActiveContainers = Maps.newHashMap(); for (Participant participant : participants) { ContainerConfig containerConfig = participant.getContainerConfig(); if (containerConfig != null && containerConfig.getState() != null) { ContainerState state = containerConfig.getState(); switch (state) { + case ACQUIRING: + existingContainersIdSet.add(containerConfig.getId()); + break; case ACQUIRED: // acquired containers are ready to start + existingContainersIdSet.add(containerConfig.getId()); containersToStart.add(participant); break; - case ACTIVE: + case CONNECTING: existingContainersIdSet.add(containerConfig.getId()); break; - case HALTED: - // halted containers can be released - containersToRelease.add(participant); + case CONNECTED: + // active containers can be stopped or kept active + existingContainersIdSet.add(containerConfig.getId()); + excessActiveContainers.put(containerConfig.getId(), participant); break; - case ACQUIRING: + case DISCONNECTED: + // disconnected containers must be stopped + existingContainersIdSet.add(containerConfig.getId()); + containersToStop.add(participant); + case HALTING: existingContainersIdSet.add(containerConfig.getId()); break; - case CONNECTING: + case HALTED: + // halted containers can be released or restarted + existingContainersIdSet.add(containerConfig.getId()); + excessHaltedContainers.put(containerConfig.getId(), participant); break; - case FAILED: - //remove the failed instance - _helixManager.getClusterManagmentTool().dropInstance(cluster.getId().toString(), new InstanceConfig(participant.getId())); + case FINALIZING: + existingContainersIdSet.add(containerConfig.getId()); break; case FINALIZED: break; - case FINALIZING: - break; - case TEARDOWN: + case FAILED: + // remove the failed instance + _helixManager.getClusterManagmentTool().dropInstance(cluster.getId().toString(), + new InstanceConfig(participant.getId())); break; default: break; @@ -318,18 +323,32 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr } } } - + for (int i = 0; i < targetNumContainers; i++) { ContainerId containerId = ContainerId.from(resourceId + "_container_" + (i)); - if(!existingContainersIdSet.contains(containerId)){ + excessActiveContainers.remove(containerId); // don't stop this container if active + if (excessHaltedContainers.containsKey(containerId)) { + // Halted containers can be restarted if necessary + Participant participant = excessHaltedContainers.get(containerId); + containersToStart.add(participant); + excessHaltedContainers.remove(containerId); // don't release this container + } else if (!existingContainersIdSet.contains(containerId)) { + // Unallocated containers must be allocated ContainerSpec containerSpec = new ContainerSpec(containerId); ParticipantId participantId = ParticipantId.from(containerId.stringify()); - ParticipantConfig participantConfig = applicationSpec.getParticipantConfig(resourceId.stringify(), participantId); + ParticipantConfig participantConfig = + applicationSpec.getParticipantConfig(resourceId.stringify(), participantId); containerSpec.setMemory(participantConfig.getUserConfig().getIntField("memory", 1024)); containersToAcquire.add(containerSpec); } } - + + // Add all the containers that should be stopped because they fall outside the target range + containersToStop.addAll(excessActiveContainers.values()); + + // Add halted containers that should not be restarted + containersToRelease.addAll(excessHaltedContainers.values()); + response.setContainersToAcquire(containersToAcquire); response.setContainersToStart(containersToStart); response.setContainersToRelease(containersToRelease);