From commits-return-21652-archive-asf-public=cust-asf.ponee.io@mesos.apache.org Thu Apr 12 23:02:42 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 70DE618077F for ; Thu, 12 Apr 2018 23:02:41 +0200 (CEST) Received: (qmail 85042 invoked by uid 500); 12 Apr 2018 21:02:40 -0000 Mailing-List: contact commits-help@mesos.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mesos.apache.org Delivered-To: mailing list commits@mesos.apache.org Received: (qmail 84906 invoked by uid 99); 12 Apr 2018 21:02:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Apr 2018 21:02:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CE25BF6BDF; Thu, 12 Apr 2018 21:02:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chhsiao@apache.org To: commits@mesos.apache.org Date: Thu, 12 Apr 2018 21:02:43 -0000 Message-Id: <111c743a738b4cd299a824606ffb25b3@git.apache.org> In-Reply-To: <389421b22b2544f99510867057097f05@git.apache.org> References: <389421b22b2544f99510867057097f05@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/6] mesos git commit: Adapted storage local resource provider to use CSI v0.2. Adapted storage local resource provider to use CSI v0.2. This patch contains necessary changes for the storage local resource provider to use CSI v0.2. Support for the `STAGE_UNSTAGE_VOLUME` CSI node service capability is not implemented in this patch yet. Review: https://reviews.apache.org/r/66410/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/aeffcd7d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/aeffcd7d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/aeffcd7d Branch: refs/heads/master Commit: aeffcd7d9a1f9c97e5e347063ceab71c43c00e2d Parents: 6dfd259 Author: Chun-Hung Hsiao Authored: Thu Apr 12 12:07:19 2018 -0700 Committer: Chun-Hung Hsiao Committed: Thu Apr 12 14:01:52 2018 -0700 ---------------------------------------------------------------------- src/resource_provider/storage/provider.cpp | 512 ++++++++++++------------ 1 file changed, 257 insertions(+), 255 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/aeffcd7d/src/resource_provider/storage/provider.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index a07620d..40544e0 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -366,10 +366,11 @@ private: void reconcileOperations( const Event::ReconcileOperations& reconcile); - Future connect(const string& endpoint); - Future getService(const ContainerID& containerId); + Future connect(const string& endpoint); + Future getService(const ContainerID& containerId); Future killService(const ContainerID& containerId); + Future prepareIdentityService(); Future prepareControllerService(); Future prepareNodeService(); Future controllerPublish(const string& volumeId); @@ -384,7 +385,7 @@ private: Future validateCapability( const string& volumeId, const Option& metadata, - const csi::VolumeCapability& capability); + const csi::v0::VolumeCapability& capability); Future listVolumes(); Future getCapacities(); @@ -436,9 +437,8 @@ private: shared_ptr diskProfileAdaptor; - csi::Version csiVersion; - csi::VolumeCapability defaultMountCapability; - csi::VolumeCapability defaultBlockCapability; + csi::v0::VolumeCapability defaultMountCapability; + csi::v0::VolumeCapability defaultBlockCapability; string bootId; process::grpc::client::Runtime runtime; Owned driver; @@ -453,14 +453,15 @@ private: // True if a reconciliation of storage pools is happening. bool reconciling; - ContainerID controllerContainerId; - ContainerID nodeContainerId; hashmap> daemons; - hashmap>> services; - - Option controllerInfo; - Option nodeInfo; - Option controllerCapabilities; + hashmap>> services; + + Option nodeContainerId; + Option controllerContainerId; + Option pluginInfo; + csi::v0::PluginCapabilities pluginCapabilities; + csi::v0::ControllerCapabilities controllerCapabilities; + csi::v0::NodeCapabilities nodeCapabilities; Option nodeId; // We maintain the following invariant: if one operation depends on @@ -554,18 +555,13 @@ void StorageLocalResourceProviderProcess::received(const Event& event) void StorageLocalResourceProviderProcess::initialize() { - // Set CSI version to 0.1.0. - csiVersion.set_major(0); - csiVersion.set_minor(1); - csiVersion.set_patch(0); - // Default mount and block capabilities for pre-existing volumes. defaultMountCapability.mutable_mount(); defaultMountCapability.mutable_access_mode() - ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER); + ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER); defaultBlockCapability.mutable_block(); defaultBlockCapability.mutable_access_mode() - ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER); + ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER); Try _bootId = os::bootId(); if (_bootId.isError()) { @@ -577,24 +573,24 @@ void StorageLocalResourceProviderProcess::initialize() foreach (const CSIPluginContainerInfo& container, info.storage().plugin().containers()) { - auto it = find( - container.services().begin(), - container.services().end(), - CSIPluginContainerInfo::CONTROLLER_SERVICE); - if (it != container.services().end()) { - controllerContainerId = getContainerId(info, container); + if (container.services().end() != find( + container.services().begin(), + container.services().end(), + CSIPluginContainerInfo::NODE_SERVICE)) { + nodeContainerId = getContainerId(info, container); break; } } + CHECK_SOME(nodeContainerId); + foreach (const CSIPluginContainerInfo& container, info.storage().plugin().containers()) { - auto it = find( - container.services().begin(), - container.services().end(), - CSIPluginContainerInfo::NODE_SERVICE); - if (it != container.services().end()) { - nodeContainerId = getContainerId(info, container); + if (container.services().end() != find( + container.services().begin(), + container.services().end(), + CSIPluginContainerInfo::CONTROLLER_SERVICE)) { + controllerContainerId = getContainerId(info, container); break; } } @@ -720,10 +716,12 @@ Future StorageLocalResourceProviderProcess::recoverServices() const ContainerID& containerId = containerPath->containerId; + CHECK_SOME(nodeContainerId); + // Do not kill the up-to-date controller or node container. // Otherwise, kill them and perform cleanups. - if (containerId == controllerContainerId || - containerId == nodeContainerId) { + if (nodeContainerId == containerId || + controllerContainerId == containerId) { const string configPath = csi::paths::getContainerInfoPath( slave::paths::getCsiRootDir(workDir), info.storage().plugin().type(), @@ -776,11 +774,14 @@ Future StorageLocalResourceProviderProcess::recoverServices() }))); } - // NOTE: The `GetNodeID` CSI call is only supported if the plugin has - // the `PUBLISH_UNPUBLISH_VOLUME` controller capability. So to decide - // if `GetNodeID` should be called in `prepareNodeService`, we need to - // run `prepareControllerService` first. + // NOTE: The `Controller` service is supported if the plugin has the + // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is + // supported if the `Controller` service has the + // `PUBLISH_UNPUBLISH_VOLUME` capability. Therefore, we first launch + // the node plugin to get the plugin capabilities, then decide if we + // need to launch the controller plugin and get the node ID. return collect(futures) + .then(defer(self(), &Self::prepareIdentityService)) .then(defer(self(), &Self::prepareControllerService)) .then(defer(self(), &Self::prepareNodeService)); } @@ -1672,19 +1673,19 @@ void StorageLocalResourceProviderProcess::reconcileOperations( // Returns a future of a CSI client that waits for the endpoint socket // to appear if necessary, then connects to the socket and check its -// supported version. -Future StorageLocalResourceProviderProcess::connect( +// readiness. +Future StorageLocalResourceProviderProcess::connect( const string& endpoint) { - Future client; + Future future; if (os::exists(endpoint)) { - client = csi::Client("unix://" + endpoint, runtime); + future = csi::v0::Client("unix://" + endpoint, runtime); } else { // Wait for the endpoint socket to appear until the timeout expires. Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT); - client = loop( + future = loop( self(), [=]() -> Future { if (timeout.expired()) { @@ -1693,30 +1694,19 @@ Future StorageLocalResourceProviderProcess::connect( return after(Milliseconds(10)); }, - [=](const Nothing&) -> ControlFlow { + [=](const Nothing&) -> ControlFlow { if (os::exists(endpoint)) { - return Break(csi::Client("unix://" + endpoint, runtime)); + return Break(csi::v0::Client("unix://" + endpoint, runtime)); } return Continue(); }); } - return client - .then(defer(self(), [=](csi::Client client) { - return client.GetSupportedVersions(csi::GetSupportedVersionsRequest()) - .then(defer(self(), [=]( - const csi::GetSupportedVersionsResponse& response) - -> Future { - auto it = find( - response.supported_versions().begin(), - response.supported_versions().end(), - csiVersion); - if (it == response.supported_versions().end()) { - return Failure( - "CSI version " + stringify(csiVersion) + " is not supported"); - } - + return future + .then(defer(self(), [=](csi::v0::Client client) { + return client.Probe(csi::v0::ProbeRequest()) + .then(defer(self(), [=](const csi::v0::ProbeResponse& response) { return client; })); })); @@ -1726,7 +1716,7 @@ Future StorageLocalResourceProviderProcess::connect( // Returns a future of the latest CSI client for the specified plugin // container. If the container is not already running, this method will // start a new a new container daemon. -Future StorageLocalResourceProviderProcess::getService( +Future StorageLocalResourceProviderProcess::getService( const ContainerID& containerId) { if (daemons.contains(containerId)) { @@ -1802,7 +1792,7 @@ Future StorageLocalResourceProviderProcess::getService( ->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL); CHECK(!services.contains(containerId)); - services[containerId].reset(new Promise()); + services[containerId].reset(new Promise()); Try> daemon = ContainerDaemon::create( extractParentEndpoint(url), @@ -1815,7 +1805,7 @@ Future StorageLocalResourceProviderProcess::getService( CHECK(services.at(containerId)->future().isPending()); return connect(endpointPath) - .then(defer(self(), [=](const csi::Client& client) { + .then(defer(self(), [=](const csi::v0::Client& client) { services.at(containerId)->set(client); return Nothing(); })) @@ -1827,16 +1817,16 @@ Future StorageLocalResourceProviderProcess::getService( })); })), std::function()>(defer(self(), [=]() -> Future { - if (containerId == controllerContainerId) { + if (containerId == controllerContainerId.get()) { metrics.csi_controller_plugin_terminations++; } - if (containerId == nodeContainerId) { + if (containerId == nodeContainerId.get()) { metrics.csi_node_plugin_terminations++; } services.at(containerId)->discard(); - services.at(containerId).reset(new Promise()); + services.at(containerId).reset(new Promise()); if (os::exists(endpointPath)) { Try rm = os::rm(endpointPath); @@ -1940,54 +1930,31 @@ Future StorageLocalResourceProviderProcess::killService( } -Future StorageLocalResourceProviderProcess::prepareControllerService() +Future StorageLocalResourceProviderProcess::prepareIdentityService() { - return getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { - // Get the plugin info and check for consistency. - csi::GetPluginInfoRequest request; - request.mutable_version()->CopyFrom(csiVersion); - - return client.GetPluginInfo(request) - .then(defer(self(), [=](const csi::GetPluginInfoResponse& response) { - controllerInfo = response; + CHECK_SOME(nodeContainerId); - LOG(INFO) - << "Controller plugin loaded: " << stringify(controllerInfo.get()); + return getService(nodeContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + // Get the plugin info. + return client.GetPluginInfo(csi::v0::GetPluginInfoRequest()) + .then(defer(self(), [=]( + const csi::v0::GetPluginInfoResponse& response) { + pluginInfo = response; - if (nodeInfo.isSome() && - (controllerInfo->name() != nodeInfo->name() || - controllerInfo->vendor_version() != - nodeInfo->vendor_version())) { - LOG(WARNING) - << "Inconsistent controller and node plugin components. Please " - "check with the plugin vendor to ensure compatibility."; - } + LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get()); - // NOTE: We always get the latest service future before - // proceeding to the next step. - return getService(controllerContainerId); + // Get the latest service future before proceeding to the next step. + return getService(nodeContainerId.get()); })); })) - .then(defer(self(), [=](csi::Client client) { - // Probe the plugin to validate the runtime environment. - csi::ControllerProbeRequest request; - request.mutable_version()->CopyFrom(csiVersion); - - return client.ControllerProbe(request) - .then(defer(self(), [=](const csi::ControllerProbeResponse& response) { - return getService(controllerContainerId); - })); - })) - .then(defer(self(), [=](csi::Client client) { - // Get the controller capabilities. - csi::ControllerGetCapabilitiesRequest request; - request.mutable_version()->CopyFrom(csiVersion); - - return client.ControllerGetCapabilities(request) + .then(defer(self(), [=](csi::v0::Client client) { + // Get the plugin capabilities. + return client.GetPluginCapabilities( + csi::v0::GetPluginCapabilitiesRequest()) .then(defer(self(), [=]( - const csi::ControllerGetCapabilitiesResponse& response) { - controllerCapabilities = response.capabilities(); + const csi::v0::GetPluginCapabilitiesResponse& response) { + pluginCapabilities = response.capabilities(); return Nothing(); })); @@ -1995,73 +1962,101 @@ Future StorageLocalResourceProviderProcess::prepareControllerService() } -Future StorageLocalResourceProviderProcess::prepareNodeService() +// NOTE: This can only be called after `prepareIdentityService`. +Future StorageLocalResourceProviderProcess::prepareControllerService() { - // NOTE: This can only be called after `prepareControllerService`. - CHECK_SOME(controllerCapabilities); + CHECK_SOME(pluginInfo); - return getService(nodeContainerId) - .then(defer(self(), [=](csi::Client client) { - // Get the plugin info and check for consistency. - csi::GetPluginInfoRequest request; - request.mutable_version()->CopyFrom(csiVersion); + if (!pluginCapabilities.controllerService) { + return Nothing(); + } - return client.GetPluginInfo(request) - .then(defer(self(), [=](const csi::GetPluginInfoResponse& response) { - nodeInfo = response; + if (controllerContainerId.isNone()) { + return Failure( + stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found"); + } - LOG(INFO) << "Node plugin loaded: " << stringify(nodeInfo.get()); + return getService(controllerContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + // Get the controller plugin info and check for consistency. + return client.GetPluginInfo(csi::v0::GetPluginInfoRequest()) + .then(defer(self(), [=]( + const csi::v0::GetPluginInfoResponse& response) { + LOG(INFO) << "Controller plugin loaded: " << stringify(response); - if (controllerInfo.isSome() && - (controllerInfo->name() != nodeInfo->name() || - controllerInfo->vendor_version() != - nodeInfo->vendor_version())) { + if (pluginInfo->name() != response.name() || + pluginInfo->vendor_version() != response.vendor_version()) { LOG(WARNING) << "Inconsistent controller and node plugin components. Please " "check with the plugin vendor to ensure compatibility."; } - // NOTE: We always get the latest service future before - // proceeding to the next step. - return getService(nodeContainerId); + // Get the latest service future before proceeding to the next step. + return getService(controllerContainerId.get()); })); })) - .then(defer(self(), [=](csi::Client client) { - // Probe the plugin to validate the runtime environment. - csi::NodeProbeRequest request; - request.mutable_version()->CopyFrom(csiVersion); - - return client.NodeProbe(request) - .then(defer(self(), [=](const csi::NodeProbeResponse& response) { - return getService(nodeContainerId); + .then(defer(self(), [=](csi::v0::Client client) { + // Get the controller capabilities. + return client.ControllerGetCapabilities( + csi::v0::ControllerGetCapabilitiesRequest()) + .then(defer(self(), [=]( + const csi::v0::ControllerGetCapabilitiesResponse& response) { + controllerCapabilities = response.capabilities(); + + return Nothing(); })); - })) - .then(defer(self(), [=](csi::Client client) -> Future { - if (!controllerCapabilities->publishUnpublishVolume) { - return Nothing(); - } + })); +} - // Get the node ID. - csi::GetNodeIDRequest request; - request.mutable_version()->CopyFrom(csiVersion); - return client.GetNodeID(request) - .then(defer(self(), [=](const csi::GetNodeIDResponse& response) { - nodeId = response.node_id(); +// NOTE: This can only be called after `prepareIdentityService` and +// `prepareControllerService`. +Future StorageLocalResourceProviderProcess::prepareNodeService() +{ + CHECK_SOME(nodeContainerId); - return Nothing(); + return getService(nodeContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + // Get the node capabilities. + return client.NodeGetCapabilities(csi::v0::NodeGetCapabilitiesRequest()) + .then(defer(self(), [=]( + const csi::v0::NodeGetCapabilitiesResponse& response) + -> Future { + nodeCapabilities = response.capabilities(); + + // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support. + if (nodeCapabilities.stageUnstageVolume) { + return Failure( + "Node capability 'STAGE_UNSTAGE_VOLUME' is not supported"); + } + + // Get the latest service future before proceeding to the next step. + return getService(nodeContainerId.get()); + })) + .then(defer(self(), [=](csi::v0::Client client) -> Future { + if (!controllerCapabilities.publishUnpublishVolume) { + return Nothing(); + } + + // Get the node ID. + return client.NodeGetId(csi::v0::NodeGetIdRequest()) + .then(defer(self(), [=]( + const csi::v0::NodeGetIdResponse& response) { + nodeId = response.node_id(); + + return Nothing(); + })); })); })); } +// NOTE: This can only be called after `prepareControllerService` and +// `prepareNodeService`. Future StorageLocalResourceProviderProcess::controllerPublish( const string& volumeId) { - // NOTE: This can only be called after `prepareControllerService` and - // `prepareNodeService`. - CHECK_SOME(controllerCapabilities); - CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome()); + CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome()); CHECK(volumes.contains(volumeId)); if (volumes.at(volumeId).state.state() == @@ -2080,11 +2075,12 @@ Future StorageLocalResourceProviderProcess::controllerPublish( Future controllerPublished; - if (controllerCapabilities->publishUnpublishVolume) { - controllerPublished = getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { - csi::ControllerPublishVolumeRequest request; - request.mutable_version()->CopyFrom(csiVersion); + if (controllerCapabilities.publishUnpublishVolume) { + CHECK_SOME(controllerContainerId); + + controllerPublished = getService(controllerContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + csi::v0::ControllerPublishVolumeRequest request; request.set_volume_id(volumeId); request.set_node_id(nodeId.get()); request.mutable_volume_capability() @@ -2095,9 +2091,9 @@ Future StorageLocalResourceProviderProcess::controllerPublish( return client.ControllerPublishVolume(request) .then(defer(self(), [=]( - const csi::ControllerPublishVolumeResponse& response) { - *volumes.at(volumeId).state.mutable_publish_volume_info() = - response.publish_volume_info(); + const csi::v0::ControllerPublishVolumeResponse& response) { + *volumes.at(volumeId).state.mutable_publish_info() = + response.publish_info(); return Nothing(); })); @@ -2122,13 +2118,13 @@ Future StorageLocalResourceProviderProcess::controllerPublish( } +// NOTE: This can only be called after `prepareControllerService` and +// `prepareNodeService`. Future StorageLocalResourceProviderProcess::controllerUnpublish( const string& volumeId) { - // NOTE: This can only be called after `prepareControllerService` and - // `prepareNodeService`. - CHECK_SOME(controllerCapabilities); - CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome()); + CHECK_SOME(controllerContainerId); + CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome()); CHECK(volumes.contains(volumeId)); if (volumes.at(volumeId).state.state() == @@ -2147,11 +2143,10 @@ Future StorageLocalResourceProviderProcess::controllerUnpublish( Future controllerUnpublished; - if (controllerCapabilities->publishUnpublishVolume) { - controllerUnpublished = getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { - csi::ControllerUnpublishVolumeRequest request; - request.mutable_version()->CopyFrom(csiVersion); + if (controllerCapabilities.publishUnpublishVolume) { + controllerUnpublished = getService(controllerContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + csi::v0::ControllerUnpublishVolumeRequest request; request.set_volume_id(volumeId); request.set_node_id(nodeId.get()); @@ -2165,7 +2160,7 @@ Future StorageLocalResourceProviderProcess::controllerUnpublish( return controllerUnpublished .then(defer(self(), [=] { volumes.at(volumeId).state.set_state(csi::state::VolumeState::CREATED); - volumes.at(volumeId).state.mutable_publish_volume_info()->clear(); + volumes.at(volumeId).state.mutable_publish_info()->clear(); checkpointVolumeState(volumeId); return Nothing(); @@ -2182,6 +2177,10 @@ Future StorageLocalResourceProviderProcess::controllerUnpublish( Future StorageLocalResourceProviderProcess::nodePublish( const string& volumeId) { + // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support. + + CHECK_SOME(nodeContainerId); + CHECK(volumes.contains(volumeId)); if (volumes.at(volumeId).state.state() == csi::state::VolumeState::NODE_PUBLISH) { @@ -2208,13 +2207,12 @@ Future StorageLocalResourceProviderProcess::nodePublish( "Failed to create mount point '" + mountPath + "': " + mkdir.error()); } - return getService(nodeContainerId) - .then(defer(self(), [=](csi::Client client) { - csi::NodePublishVolumeRequest request; - request.mutable_version()->CopyFrom(csiVersion); + return getService(nodeContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + csi::v0::NodePublishVolumeRequest request; request.set_volume_id(volumeId); - *request.mutable_publish_volume_info() = - volumes.at(volumeId).state.publish_volume_info(); + *request.mutable_publish_info() = + volumes.at(volumeId).state.publish_info(); request.set_target_path(mountPath); request.mutable_volume_capability() ->CopyFrom(volumes.at(volumeId).state.volume_capability()); @@ -2243,6 +2241,10 @@ Future StorageLocalResourceProviderProcess::nodePublish( Future StorageLocalResourceProviderProcess::nodeUnpublish( const string& volumeId) { + // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support. + + CHECK_SOME(nodeContainerId); + CHECK(volumes.contains(volumeId)); if (volumes.at(volumeId).state.state() == csi::state::VolumeState::NODE_UNPUBLISH) { @@ -2267,10 +2269,9 @@ Future StorageLocalResourceProviderProcess::nodeUnpublish( Future nodeUnpublished; if (os::exists(mountPath)) { - nodeUnpublished = getService(nodeContainerId) - .then(defer(self(), [=](csi::Client client) { - csi::NodeUnpublishVolumeRequest request; - request.mutable_version()->CopyFrom(csiVersion); + nodeUnpublished = getService(nodeContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + csi::v0::NodeUnpublishVolumeRequest request; request.set_volume_id(volumeId); request.set_target_path(mountPath); @@ -2310,22 +2311,22 @@ Future StorageLocalResourceProviderProcess::nodeUnpublish( // Returns a CSI volume ID. +// NOTE: This can only be called after `prepareControllerService`. Future StorageLocalResourceProviderProcess::createVolume( const string& name, const Bytes& capacity, const DiskProfileAdaptor::ProfileInfo& profileInfo) { - // NOTE: This can only be called after `prepareControllerService`. - CHECK_SOME(controllerCapabilities); - - if (!controllerCapabilities->createDeleteVolume) { - return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported"); + if (!controllerCapabilities.createDeleteVolume) { + return Failure( + "Controller capability 'CREATE_DELETE_VOLUME' is not supported"); } - return getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { - csi::CreateVolumeRequest request; - request.mutable_version()->CopyFrom(csiVersion); + CHECK_SOME(controllerContainerId); + + return getService(controllerContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + csi::v0::CreateVolumeRequest request; request.set_name(name); request.mutable_capacity_range() ->set_required_bytes(capacity.bytes()); @@ -2335,47 +2336,49 @@ Future StorageLocalResourceProviderProcess::createVolume( *request.mutable_parameters() = profileInfo.parameters; return client.CreateVolume(request) - .then(defer(self(), [=](const csi::CreateVolumeResponse& response) { - const csi::VolumeInfo& volumeInfo = response.volume_info(); + .then(defer(self(), [=](const csi::v0::CreateVolumeResponse& response) { + const csi::v0::Volume& volume = response.volume(); - if (volumes.contains(volumeInfo.id())) { + if (volumes.contains(volume.id())) { // The resource provider failed over after the last // `CreateVolume` call, but before the operation status was // checkpointed. CHECK_EQ(csi::state::VolumeState::CREATED, - volumes.at(volumeInfo.id()).state.state()); + volumes.at(volume.id()).state.state()); } else { csi::state::VolumeState volumeState; volumeState.set_state(csi::state::VolumeState::CREATED); volumeState.mutable_volume_capability() ->CopyFrom(profileInfo.capability); - *volumeState.mutable_volume_attributes() = volumeInfo.attributes(); + *volumeState.mutable_volume_attributes() = volume.attributes(); - volumes.put(volumeInfo.id(), std::move(volumeState)); - checkpointVolumeState(volumeInfo.id()); + volumes.put(volume.id(), std::move(volumeState)); + checkpointVolumeState(volume.id()); } - return volumeInfo.id(); + return volume.id(); })); })); } +// NOTE: This can only be called after `prepareControllerService` and +// `prepareNodeService` (since it may require `NodeUnpublishVolume`). Future StorageLocalResourceProviderProcess::deleteVolume( const string& volumeId, bool preExisting) { - // NOTE: This can only be called after `prepareControllerService` and - // `prepareNodeService` (since it may require `NodeUnpublishVolume`). - CHECK_SOME(controllerCapabilities); - CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome()); + CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome()); // We do not need the capability for pre-existing volumes since no // actual `DeleteVolume` call will be made. - if (!preExisting && !controllerCapabilities->createDeleteVolume) { - return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported"); + if (!preExisting && !controllerCapabilities.createDeleteVolume) { + return Failure( + "Controller capability 'CREATE_DELETE_VOLUME' is not supported"); } + CHECK_SOME(controllerContainerId); + const string volumePath = csi::paths::getVolumePath( slave::paths::getCsiRootDir(workDir), info.storage().plugin().type(), @@ -2399,10 +2402,9 @@ Future StorageLocalResourceProviderProcess::deleteVolume( case csi::state::VolumeState::CREATED: { if (!preExisting) { deleted = deleted - .then(defer(self(), &Self::getService, controllerContainerId)) - .then(defer(self(), [=](csi::Client client) { - csi::DeleteVolumeRequest request; - request.mutable_version()->CopyFrom(csiVersion); + .then(defer(self(), &Self::getService, controllerContainerId.get())) + .then(defer(self(), [=](csi::v0::Client client) { + csi::v0::DeleteVolumeRequest request; request.set_volume_id(volumeId); return client.DeleteVolume(request) @@ -2443,34 +2445,41 @@ Future StorageLocalResourceProviderProcess::deleteVolume( } -// Validates if a volume has the specified capability. This is called -// when applying `CREATE_VOLUME` or `CREATE_BLOCK` on a pre-existing -// volume, so we make it returns a volume ID, similar to `createVolume`. +// Validates if a volume has the specified capability. This is called when +// applying `CREATE_VOLUME` or `CREATE_BLOCK` on a pre-existing volume, so we +// make it returns a volume ID, similar to `createVolume`. +// NOTE: This can only be called after `prepareIdentityService` and only for +// newly discovered volumes. Future StorageLocalResourceProviderProcess::validateCapability( const string& volumeId, const Option& metadata, - const csi::VolumeCapability& capability) + const csi::v0::VolumeCapability& capability) { - // NOTE: This can only be called for newly discovered volumes. CHECK(!volumes.contains(volumeId)); - return getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { + if (!pluginCapabilities.controllerService) { + return Failure( + "Plugin capability 'CONTROLLER_SERVICE' is not supported"); + } + + CHECK_SOME(controllerContainerId); + + return getService(controllerContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { google::protobuf::Map volumeAttributes; if (metadata.isSome()) { volumeAttributes = convertLabelsToStringMap(metadata.get()).get(); } - csi::ValidateVolumeCapabilitiesRequest request; - request.mutable_version()->CopyFrom(csiVersion); + csi::v0::ValidateVolumeCapabilitiesRequest request; request.set_volume_id(volumeId); request.add_volume_capabilities()->CopyFrom(capability); *request.mutable_volume_attributes() = volumeAttributes; return client.ValidateVolumeCapabilities(request) .then(defer(self(), [=]( - const csi::ValidateVolumeCapabilitiesResponse& response) + const csi::v0::ValidateVolumeCapabilitiesResponse& response) -> Future { if (!response.supported()) { return Failure( @@ -2492,27 +2501,25 @@ Future StorageLocalResourceProviderProcess::validateCapability( } +// NOTE: This can only be called after `prepareControllerService` and +// the resource provider ID has been obtained. Future StorageLocalResourceProviderProcess::listVolumes() { - // NOTE: This can only be called after `prepareControllerService` and - // the resource provider ID has been obtained. - CHECK_SOME(controllerCapabilities); CHECK(info.has_id()); // This is only used for reconciliation so no failure is returned. - if (!controllerCapabilities->listVolumes) { + if (!controllerCapabilities.listVolumes) { return Resources(); } - return getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { + CHECK_SOME(controllerContainerId); + + return getService(controllerContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { // TODO(chhsiao): Set the max entries and use a loop to do // mutliple `ListVolumes` calls. - csi::ListVolumesRequest request; - request.mutable_version()->CopyFrom(csiVersion); - - return client.ListVolumes(request) - .then(defer(self(), [=](const csi::ListVolumesResponse& response) { + return client.ListVolumes(csi::v0::ListVolumesRequest()) + .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) { Resources resources; // Recover disk profiles from the checkpointed state. @@ -2529,15 +2536,14 @@ Future StorageLocalResourceProviderProcess::listVolumes() foreach (const auto& entry, response.entries()) { resources += createRawDiskResource( info, - Bytes(entry.volume_info().capacity_bytes()), - volumesToProfiles.contains(entry.volume_info().id()) - ? volumesToProfiles.at(entry.volume_info().id()) + Bytes(entry.volume().capacity_bytes()), + volumesToProfiles.contains(entry.volume().id()) + ? volumesToProfiles.at(entry.volume().id()) : Option::none(), - entry.volume_info().id(), - entry.volume_info().attributes().empty() + entry.volume().id(), + entry.volume().attributes().empty() ? Option::none() - : convertStringMapToLabels( - entry.volume_info().attributes())); + : convertStringMapToLabels(entry.volume().attributes())); } return resources; @@ -2546,20 +2552,21 @@ Future StorageLocalResourceProviderProcess::listVolumes() } +// NOTE: This can only be called after `prepareControllerService` and +// the resource provider ID has been obtained. Future StorageLocalResourceProviderProcess::getCapacities() { - // NOTE: This can only be called after `prepareControllerService` and - // the resource provider ID has been obtained. - CHECK_SOME(controllerCapabilities); CHECK(info.has_id()); // This is only used for reconciliation so no failure is returned. - if (!controllerCapabilities->getCapacity) { + if (!controllerCapabilities.getCapacity) { return Resources(); } - return getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { + CHECK_SOME(controllerContainerId); + + return getService(controllerContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { list> futures; foreach (const string& profile, knownProfiles) { @@ -2570,14 +2577,13 @@ Future StorageLocalResourceProviderProcess::getCapacities() const DiskProfileAdaptor::ProfileInfo& profileInfo = profileInfos.at(profile); - csi::GetCapacityRequest request; - request.mutable_version()->CopyFrom(csiVersion); + csi::v0::GetCapacityRequest request; request.add_volume_capabilities()->CopyFrom(profileInfo.capability); *request.mutable_parameters() = profileInfo.parameters; futures.push_back(client.GetCapacity(request) .then(defer(self(), [=]( - const csi::GetCapacityResponse& response) -> Resources { + const csi::v0::GetCapacityResponse& response) -> Resources { if (response.available_capacity() == 0) { return Resources(); } @@ -3218,26 +3224,22 @@ Try> StorageLocalResourceProvider::create( "' does not follow Java package naming convention"); } - bool hasControllerService = false; + // Verify that the plugin provides the CSI node service. + // TODO(chhsiao): We should move this check to a validation function + // for `CSIPluginInfo`. bool hasNodeService = false; foreach (const CSIPluginContainerInfo& container, info.storage().plugin().containers()) { - for (int i = 0; i < container.services_size(); i++) { - const CSIPluginContainerInfo::Service service = container.services(i); - if (service == CSIPluginContainerInfo::CONTROLLER_SERVICE) { - hasControllerService = true; - } else if (service == CSIPluginContainerInfo::NODE_SERVICE) { - hasNodeService = true; - } + if (container.services().end() != find( + container.services().begin(), + container.services().end(), + CSIPluginContainerInfo::NODE_SERVICE)) { + hasNodeService = true; + break; } } - if (!hasControllerService) { - return Error( - stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found"); - } - if (!hasNodeService) { return Error( stringify(CSIPluginContainerInfo::NODE_SERVICE) + " not found");