From common-commits-return-90717-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Tue Nov 13 08:50:39 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 008FF18067C for ; Tue, 13 Nov 2018 08:50:37 +0100 (CET) Received: (qmail 29545 invoked by uid 500); 13 Nov 2018 07:50:33 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 24764 invoked by uid 99); 13 Nov 2018 07:50:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Nov 2018 07:50:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 62955E12C2; Tue, 13 Nov 2018 07:50:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: brahma@apache.org To: common-commits@hadoop.apache.org Date: Tue, 13 Nov 2018 07:50:54 -0000 Message-Id: <6157e6913a924b35b25b8e3acf0582a8@git.apache.org> In-Reply-To: <7db04a8bb7b542378b7052f3836c74a8@git.apache.org> References: <7db04a8bb7b542378b7052f3836c74a8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [27/49] hadoop git commit: YARN-8902. [CSI] Add volume manager that manages CSI volume lifecycle. Contributed by Weiwei Yang. http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java new file mode 100644 index 0000000..f275768 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor; + +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; +import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask; +import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData; +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; +import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException; +import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeProvisioningException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * AMS processor that handles volume resource requests. + * + */ +public class VolumeAMSProcessor implements ApplicationMasterServiceProcessor { + + private static final Logger LOG = LoggerFactory + .getLogger(VolumeAMSProcessor.class); + + private ApplicationMasterServiceProcessor nextAMSProcessor; + private VolumeManager volumeManager; + + @Override + public void init(ApplicationMasterServiceContext amsContext, + ApplicationMasterServiceProcessor nextProcessor) { + LOG.info("Initializing CSI volume processor"); + this.nextAMSProcessor = nextProcessor; + this.volumeManager = ((RMContext) amsContext).getVolumeManager(); + } + + @Override + public void registerApplicationMaster( + ApplicationAttemptId applicationAttemptId, + RegisterApplicationMasterRequest request, + RegisterApplicationMasterResponse response) + throws IOException, YarnException { + this.nextAMSProcessor.registerApplicationMaster(applicationAttemptId, + request, response); + } + + @Override + public void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse response) throws YarnException { + List volumes = aggregateVolumesFrom(request); + if (volumes != null && volumes.size() > 0) { + ScheduledFuture result = + this.volumeManager.schedule(new VolumeProvisioningTask(volumes), 0); + try { + VolumeProvisioningResults volumeResult = + result.get(3, TimeUnit.SECONDS); + if (!volumeResult.isSuccess()) { + throw new VolumeProvisioningException("Volume provisioning failed," + + " result details: " + volumeResult.getBriefMessage()); + } + } catch (TimeoutException | InterruptedException | ExecutionException e) { + LOG.warn("Volume provisioning task failed", e); + throw new VolumeException("Volume provisioning task failed", e); + } + } + + // Go to next processor + this.nextAMSProcessor.allocate(appAttemptId, request, response); + } + + // Currently only scheduling request is supported. + private List aggregateVolumesFrom(AllocateRequest request) + throws VolumeException { + List volumeList = new ArrayList<>(); + List requests = request.getSchedulingRequests(); + if (requests != null) { + for (SchedulingRequest req : requests) { + Resource totalResource = req.getResourceSizing().getResources(); + List resourceList = + totalResource.getAllResourcesListCopy(); + for (ResourceInformation resourceInformation : resourceList) { + List volumes = + VolumeMetaData.fromResource(resourceInformation); + for (VolumeMetaData vs : volumes) { + if (vs.getVolumeCapabilityRange().getMinCapacity() <= 0) { + // capacity not specified, ignore + continue; + } else if (vs.isProvisionedVolume()) { + volumeList.add(checkAndGetVolume(vs)); + } else { + throw new InvalidVolumeException("Only pre-provisioned volume" + + " is supported now, volumeID must exist."); + } + } + } + } + } + return volumeList; + } + + /** + * If given volume ID already exists in the volume manager, + * it returns the existing volume. Otherwise, it creates a new + * volume and add that to volume manager. + * @param metaData + * @return volume + */ + private Volume checkAndGetVolume(VolumeMetaData metaData) { + Volume toAdd = new VolumeImpl(metaData); + return this.volumeManager.addOrGetVolume(toAdd); + } + + @Override + public void finishApplicationMaster( + ApplicationAttemptId applicationAttemptId, + FinishApplicationMasterRequest request, + FinishApplicationMasterResponse response) { + this.nextAMSProcessor.finishApplicationMaster(applicationAttemptId, + request, response); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java new file mode 100644 index 0000000..788a417 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains AMS processor class for volume handling. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioner.java new file mode 100644 index 0000000..47f0df2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioner.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +import java.util.concurrent.Callable; + +/** + * A task interface to provision a volume to expected state. + */ +@Private +@Unstable +public interface VolumeProvisioner extends Callable { + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningResults.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningResults.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningResults.java new file mode 100644 index 0000000..657fa6c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningResults.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner; + +import com.google.gson.JsonObject; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState; +import org.apache.hadoop.yarn.server.volume.csi.VolumeId; + +import java.util.HashMap; +import java.util.Map; + +/** + * Result of volumes' provisioning. + */ +public class VolumeProvisioningResults { + + private Map resultMap; + + public VolumeProvisioningResults() { + this.resultMap = new HashMap<>(); + } + + public boolean isSuccess() { + return !resultMap.isEmpty() && resultMap.values().stream() + .allMatch(subResult -> subResult.isSuccess()); + } + + public String getBriefMessage() { + JsonObject obj = new JsonObject(); + obj.addProperty("TotalVolumes", resultMap.size()); + + JsonObject failed = new JsonObject(); + for (VolumeProvisioningResult result : resultMap.values()) { + if (!result.isSuccess()) { + failed.addProperty(result.getVolumeId().toString(), + result.getVolumeState().name()); + } + } + obj.add("failedVolumesStates", failed); + return obj.toString(); + } + + static class VolumeProvisioningResult { + + private VolumeId volumeId; + private VolumeState volumeState; + private boolean success; + + VolumeProvisioningResult(VolumeId volumeId, VolumeState state) { + this.volumeId = volumeId; + this.volumeState = state; + this.success = state == VolumeState.NODE_READY; + } + + public boolean isSuccess() { + return this.success; + } + + public VolumeId getVolumeId() { + return this.volumeId; + } + + public VolumeState getVolumeState() { + return this.volumeState; + } + } + + public void addResult(VolumeId volumeId, VolumeState state) { + this.resultMap.put(volumeId, + new VolumeProvisioningResult(volumeId, state)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningTask.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningTask.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningTask.java new file mode 100644 index 0000000..eb35431 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningTask.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner; + +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ControllerPublishVolumeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ValidateVolumeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * A provisioning task encapsulates all the logic required by a storage system + * to provision a volume. This class is the common implementation, it might + * be override if the provisioning behavior of a certain storage system + * is not completely align with this implementation. + */ +public class VolumeProvisioningTask implements VolumeProvisioner { + + private static final Logger LOG = LoggerFactory + .getLogger(VolumeProvisioningTask.class); + + private List volumes; + + public VolumeProvisioningTask(List volumes) { + this.volumes = volumes; + } + + public List getVolumes() { + return this.volumes; + } + + @Override + public VolumeProvisioningResults call() throws Exception { + VolumeProvisioningResults vpr = new VolumeProvisioningResults(); + + // Wait all volumes are reaching expected state + for (Volume vs : volumes) { + LOG.info("Provisioning volume : {}", vs.getVolumeId().toString()); + vs.handle(new ValidateVolumeEvent(vs)); + vs.handle(new ControllerPublishVolumeEvent(vs)); + } + + // collect results + volumes.stream().forEach(v -> + vpr.addResult(v.getVolumeId(), v.getVolumeState())); + + return vpr; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/package-info.java new file mode 100644 index 0000000..92b4bdf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains the volume provisioning classes. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java new file mode 100644 index 0000000..3f9bc93 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes to manage storage volumes in YARN. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.volume; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapabilityRange.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapabilityRange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapabilityRange.java new file mode 100644 index 0000000..5132864 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapabilityRange.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; +import org.apache.hadoop.yarn.server.volume.csi.VolumeCapabilityRange; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases for volume capability. + */ +public class TestVolumeCapabilityRange { + + @Test(expected = InvalidVolumeException.class) + public void testInvalidMinCapability() throws InvalidVolumeException { + VolumeCapabilityRange.newBuilder() + .minCapacity(-1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + } + + @Test(expected = InvalidVolumeException.class) + public void testMissingMinCapability() throws InvalidVolumeException { + VolumeCapabilityRange.newBuilder() + .maxCapacity(5L) + .unit("Gi") + .build(); + } + + @Test(expected = InvalidVolumeException.class) + public void testMissingUnit() throws InvalidVolumeException { + VolumeCapabilityRange.newBuilder() + .minCapacity(0L) + .maxCapacity(5L) + .build(); + } + + @Test + public void testGetVolumeCapability() throws InvalidVolumeException { + VolumeCapabilityRange vc = VolumeCapabilityRange.newBuilder() + .minCapacity(0L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + Assert.assertEquals(0L, vc.getMinCapacity()); + Assert.assertEquals(5L, vc.getMaxCapacity()); + Assert.assertEquals("Gi", vc.getUnit()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java new file mode 100644 index 0000000..18c23e8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ControllerPublishVolumeEvent; +import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol; +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ValidateVolumeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState; +import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Mockito.*; + +/** + * Test cases for volume lifecycle management. + */ +public class TestVolumeLifecycle { + + @Test + public void testValidation() throws InvalidVolumeException { + VolumeImpl volume = (VolumeImpl) VolumeBuilder.newBuilder() + .volumeId("test_vol_00000001") + .maxCapability(5L) + .unit("Gi") + .mountPoint("/path/to/mount") + .driverName("test-driver-name") + .build(); + Assert.assertEquals(VolumeState.NEW, volume.getVolumeState()); + + volume.handle(new ValidateVolumeEvent(volume)); + Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState()); + } + + @Test + public void testValidationFailure() throws VolumeException { + VolumeImpl volume = (VolumeImpl) VolumeBuilder + .newBuilder().volumeId("test_vol_00000001").build(); + CsiAdaptorClientProtocol mockedClient = Mockito + .mock(CsiAdaptorClientProtocol.class); + volume.setClient(mockedClient); + + // NEW -> UNAVAILABLE + // Simulate a failed API call to the adaptor + doThrow(new VolumeException("failed")).when(mockedClient).validateVolume(); + volume.handle(new ValidateVolumeEvent(volume)); + + try { + // Verify the countdown did not happen + GenericTestUtils.waitFor(() -> + volume.getVolumeState() == VolumeState.VALIDATED, 10, 50); + Assert.fail("Validate state not reached," + + " it should keep waiting until timeout"); + } catch (Exception e) { + Assert.assertTrue(e instanceof TimeoutException); + Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState()); + } + } + + @Test + public void testValidated() throws InvalidVolumeException { + AtomicInteger validatedTimes = new AtomicInteger(0); + VolumeImpl volume = (VolumeImpl) VolumeBuilder + .newBuilder().volumeId("test_vol_00000001").build(); + CsiAdaptorClientProtocol mockedClient = new CsiAdaptorClientProtocol() { + @Override + public void validateVolume() { + validatedTimes.incrementAndGet(); + } + + @Override + public void controllerPublishVolume() { + // do nothing + } + }; + // The client has a count to memorize how many times being called + volume.setClient(mockedClient); + + // NEW -> VALIDATED + Assert.assertEquals(VolumeState.NEW, volume.getVolumeState()); + volume.handle(new ValidateVolumeEvent(volume)); + Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState()); + Assert.assertEquals(1, validatedTimes.get()); + + // VALIDATED -> VALIDATED + volume.handle(new ValidateVolumeEvent(volume)); + Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState()); + Assert.assertEquals(1, validatedTimes.get()); + } + + @Test + public void testUnavailableState() throws VolumeException { + VolumeImpl volume = (VolumeImpl) VolumeBuilder + .newBuilder().volumeId("test_vol_00000001").build(); + CsiAdaptorClientProtocol mockedClient = Mockito + .mock(CsiAdaptorClientProtocol.class); + volume.setClient(mockedClient); + + // NEW -> UNAVAILABLE + doThrow(new VolumeException("failed")).when(mockedClient) + .validateVolume(); + Assert.assertEquals(VolumeState.NEW, volume.getVolumeState()); + volume.handle(new ValidateVolumeEvent(volume)); + Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState()); + + // UNAVAILABLE -> UNAVAILABLE + volume.handle(new ValidateVolumeEvent(volume)); + Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState()); + + // UNAVAILABLE -> VALIDATED + doNothing().when(mockedClient).validateVolume(); + volume.setClient(mockedClient); + volume.handle(new ValidateVolumeEvent(volume)); + Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState()); + } + + @Test + public void testPublishUnavailableVolume() throws VolumeException { + VolumeImpl volume = (VolumeImpl) VolumeBuilder + .newBuilder().volumeId("test_vol_00000001").build(); + CsiAdaptorClientProtocol mockedClient = Mockito + .mock(CsiAdaptorClientProtocol.class); + volume.setClient(mockedClient); + + // NEW -> UNAVAILABLE (on validateVolume) + doThrow(new VolumeException("failed")).when(mockedClient) + .validateVolume(); + Assert.assertEquals(VolumeState.NEW, volume.getVolumeState()); + volume.handle(new ValidateVolumeEvent(volume)); + Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState()); + + // UNAVAILABLE -> UNAVAILABLE (on publishVolume) + volume.handle(new ControllerPublishVolumeEvent(volume)); + // controller publish is not called since the state is UNAVAILABLE + verify(mockedClient, times(0)).controllerPublishVolume(); + // state remains to UNAVAILABLE + Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeMetaData.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeMetaData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeMetaData.java new file mode 100644 index 0000000..38dbe03 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeMetaData.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.apache.hadoop.yarn.server.volume.csi.CsiConstants; +import org.apache.hadoop.yarn.server.volume.csi.VolumeCapabilityRange; +import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData; +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; +import org.apache.hadoop.yarn.server.volume.csi.VolumeId; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; + +/** + * Test cases for volume specification definition and parsing. + */ +public class TestVolumeMetaData { + + @Test + public void testPreprovisionedVolume() throws InvalidVolumeException { + VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder() + .minCapacity(1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + // When volume id is given, volume name is optional + VolumeMetaData meta = VolumeMetaData.newBuilder() + .volumeId(new VolumeId("id-000001")) + .capability(cap) + .driverName("csi-demo-driver") + .mountPoint("/mnt/data") + .build(); + + Assert.assertEquals(new VolumeId("id-000001"), meta.getVolumeId()); + Assert.assertEquals(1L, meta.getVolumeCapabilityRange().getMinCapacity()); + Assert.assertEquals(5L, meta.getVolumeCapabilityRange().getMaxCapacity()); + Assert.assertEquals("Gi", meta.getVolumeCapabilityRange().getUnit()); + Assert.assertEquals("csi-demo-driver", meta.getDriverName()); + Assert.assertEquals("/mnt/data", meta.getMountPoint()); + Assert.assertNull(meta.getVolumeName()); + Assert.assertTrue(meta.isProvisionedVolume()); + + // Test toString + JsonParser parser = new JsonParser(); + JsonElement element = parser.parse(meta.toString()); + JsonObject json = element.getAsJsonObject(); + Assert.assertNotNull(json); + Assert.assertNull(json.get(CsiConstants.CSI_VOLUME_NAME)); + Assert.assertEquals("id-000001", + json.get(CsiConstants.CSI_VOLUME_ID).getAsString()); + Assert.assertEquals("csi-demo-driver", + json.get(CsiConstants.CSI_DRIVER_NAME).getAsString()); + Assert.assertEquals("/mnt/data", + json.get(CsiConstants.CSI_VOLUME_MOUNT).getAsString()); + + } + + @Test + public void testDynamicalProvisionedVolume() throws InvalidVolumeException { + VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder() + .minCapacity(1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + // When volume name is given, volume id is optional + VolumeMetaData meta = VolumeMetaData.newBuilder() + .volumeName("volume-name") + .capability(cap) + .driverName("csi-demo-driver") + .mountPoint("/mnt/data") + .build(); + Assert.assertNotNull(meta); + + Assert.assertEquals("volume-name", meta.getVolumeName()); + Assert.assertEquals(1L, meta.getVolumeCapabilityRange().getMinCapacity()); + Assert.assertEquals(5L, meta.getVolumeCapabilityRange().getMaxCapacity()); + Assert.assertEquals("Gi", meta.getVolumeCapabilityRange().getUnit()); + Assert.assertEquals("csi-demo-driver", meta.getDriverName()); + Assert.assertEquals("/mnt/data", meta.getMountPoint()); + Assert.assertFalse(meta.isProvisionedVolume()); + + // Test toString + JsonParser parser = new JsonParser(); + JsonElement element = parser.parse(meta.toString()); + JsonObject json = element.getAsJsonObject(); + Assert.assertNotNull(json); + Assert.assertNull(json.get(CsiConstants.CSI_VOLUME_ID)); + Assert.assertEquals("volume-name", + json.get(CsiConstants.CSI_VOLUME_NAME).getAsString()); + Assert.assertEquals("csi-demo-driver", + json.get(CsiConstants.CSI_DRIVER_NAME).getAsString()); + Assert.assertEquals("/mnt/data", + json.get(CsiConstants.CSI_VOLUME_MOUNT).getAsString()); + } + + @Test(expected = InvalidVolumeException.class) + public void testMissingMountpoint() throws InvalidVolumeException { + VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder() + .minCapacity(1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + VolumeMetaData.newBuilder() + .volumeId(new VolumeId("id-000001")) + .capability(cap) + .driverName("csi-demo-driver") + .build(); + } + + + @Test(expected = InvalidVolumeException.class) + public void testMissingCsiDriverName() throws InvalidVolumeException { + VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder() + .minCapacity(1L) + .maxCapacity(5L) + .unit("Gi") + .build(); + + VolumeMetaData.newBuilder() + .volumeId(new VolumeId("id-000001")) + .capability(cap) + .mountPoint("/mnt/data") + .build(); + } + + @Test(expected = InvalidVolumeException.class) + public void testMissingVolumeCapability() throws InvalidVolumeException { + VolumeMetaData.newBuilder() + .volumeId(new VolumeId("id-000001")) + .driverName("csi-demo-driver") + .mountPoint("/mnt/data") + .build(); + } + + @Test + public void testVolumeId() { + VolumeId id1 = new VolumeId("test00001"); + VolumeId id11 = new VolumeId("test00001"); + VolumeId id2 = new VolumeId("test00002"); + + Assert.assertEquals(id1, id11); + Assert.assertEquals(id1.hashCode(), id11.hashCode()); + Assert.assertNotEquals(id1, id2); + + HashMap map = new HashMap<>(); + map.put(id1, "1"); + Assert.assertEquals(1, map.size()); + Assert.assertEquals("1", map.get(id11)); + map.put(id11, "2"); + Assert.assertEquals(1, map.size()); + Assert.assertEquals("2", map.get(id11)); + Assert.assertEquals("2", map.get(new VolumeId("test00001"))); + + Assert.assertNotEquals(id1, id2); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java new file mode 100644 index 0000000..d6f9d92 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState; +import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor; +import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol; +import org.apache.hadoop.yarn.server.volume.csi.CsiConstants; +import org.apache.hadoop.yarn.server.volume.csi.VolumeId; +import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; +import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException; +import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeProvisioningException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Arrays; + +/** + * Test cases for volume processor. + */ +public class TestVolumeProcessor { + + private static final int GB = 1024; + private YarnConfiguration conf; + private RMNodeLabelsManager mgr; + private MockRM rm; + private MockNM[] mockNMS; + private RMNode[] rmNodes; + private static final int NUM_OF_NMS = 4; + // resource-types.xml will be created under target/test-classes/ dir, + // it must be deleted after the test is done, to avoid it from reading + // by other UT classes. + private File resourceTypesFile = null; + + private static final String VOLUME_RESOURCE_NAME = "yarn.io/csi-volume"; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + resourceTypesFile = new File(conf.getClassLoader() + .getResource(".").getPath(), "resource-types.xml"); + writeTmpResourceTypesFile(resourceTypesFile); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + conf.set("yarn.scheduler.capacity.resource-calculator", + "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"); + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".default.ordering-policy", + "fair"); + conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS, + VolumeAMSProcessor.class.getName()); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + rm = new MockRM(conf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + mockNMS = new MockNM[NUM_OF_NMS]; + rmNodes = new RMNode[NUM_OF_NMS]; + for (int i = 0; i < 4; i++) { + mockNMS[i] = rm.registerNode("192.168.0." + i + ":1234", 10 * GB); + rmNodes[i] = rm.getRMContext().getRMNodes().get(mockNMS[i].getNodeId()); + } + } + + @After + public void tearDown() { + if (resourceTypesFile != null && resourceTypesFile.exists()) { + resourceTypesFile.delete(); + } + } + + private void writeTmpResourceTypesFile(File tmpFile) throws IOException { + FileWriter fw = new FileWriter(tmpFile); + try { + Configuration yarnConf = new YarnConfiguration(); + yarnConf.set(YarnConfiguration.RESOURCE_TYPES, VOLUME_RESOURCE_NAME); + yarnConf.set("yarn.resource-types." + + VOLUME_RESOURCE_NAME + ".units", "Mi"); + yarnConf.writeXml(fw); + } finally { + fw.close(); + } + } + + @Test (timeout = 10000L) + public void testVolumeProvisioning() throws Exception { + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]); + Resource resource = Resource.newInstance(1024, 1); + ResourceInformation volumeResource = ResourceInformation + .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024, + ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE, + ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG), + ImmutableMap.of( + CsiConstants.CSI_VOLUME_ID, "test-vol-000001", + CsiConstants.CSI_DRIVER_NAME, "hostpath", + CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data" + ) + ); + resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource); + SchedulingRequest sc = SchedulingRequest + .newBuilder().allocationRequestId(0L) + .resourceSizing(ResourceSizing.newInstance(1, resource)) + .build(); + AllocateRequest ar = AllocateRequest.newBuilder() + .schedulingRequests(Arrays.asList(sc)) + .build(); + + am1.allocate(ar); + VolumeStates volumeStates = + rm.getRMContext().getVolumeManager().getVolumeStates(); + Assert.assertNotNull(volumeStates); + VolumeState volumeState = VolumeState.NEW; + while (volumeState != VolumeState.NODE_READY) { + Volume volume = volumeStates + .getVolume(new VolumeId("test-vol-000001")); + if (volume != null) { + volumeState = volume.getVolumeState(); + } + am1.doHeartbeat(); + mockNMS[0].nodeHeartbeat(true); + Thread.sleep(500); + } + rm.stop(); + } + + @Test (timeout = 30000L) + public void testInvalidRequest() throws Exception { + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]); + Resource resource = Resource.newInstance(1024, 1); + ResourceInformation volumeResource = ResourceInformation + .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024, + ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE, + ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG), + ImmutableMap.of( + // volume ID is missing... + CsiConstants.CSI_VOLUME_NAME, "test-vol-000001", + CsiConstants.CSI_DRIVER_NAME, "hostpath", + CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data" + ) + ); + resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource); + SchedulingRequest sc = SchedulingRequest + .newBuilder().allocationRequestId(0L) + .resourceSizing(ResourceSizing.newInstance(1, resource)) + .build(); + AllocateRequest ar = AllocateRequest.newBuilder() + .schedulingRequests(Arrays.asList(sc)) + .build(); + + try { + am1.allocate(ar); + Assert.fail("allocate should fail because invalid request received"); + } catch (Exception e) { + Assert.assertTrue(e instanceof InvalidVolumeException); + } + rm.stop(); + } + + @Test (timeout = 30000L) + public void testProvisioningFailures() throws Exception { + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]); + + CsiAdaptorClientProtocol mockedClient = Mockito + .mock(CsiAdaptorClientProtocol.class); + // inject adaptor client + rm.getRMContext().getVolumeManager().setClient(mockedClient); + Mockito.doThrow(new VolumeException("failed")).when(mockedClient) + .validateVolume(); + + Resource resource = Resource.newInstance(1024, 1); + ResourceInformation volumeResource = ResourceInformation + .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024, + ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE, + ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG), + ImmutableMap.of( + CsiConstants.CSI_VOLUME_ID, "test-vol-000001", + CsiConstants.CSI_DRIVER_NAME, "hostpath", + CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data" + ) + ); + resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource); + SchedulingRequest sc = SchedulingRequest + .newBuilder().allocationRequestId(0L) + .resourceSizing(ResourceSizing.newInstance(1, resource)) + .build(); + AllocateRequest ar = AllocateRequest.newBuilder() + .schedulingRequests(Arrays.asList(sc)) + .build(); + + try { + am1.allocate(ar); + Assert.fail("allocate should fail"); + } catch (Exception e) { + Assert.assertTrue(e instanceof VolumeProvisioningException); + } + rm.stop(); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org