Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 90B2F10FBF for ; Wed, 26 Nov 2014 00:22:02 +0000 (UTC) Received: (qmail 81410 invoked by uid 500); 26 Nov 2014 00:22:02 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 81343 invoked by uid 500); 26 Nov 2014 00:22:02 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 81334 invoked by uid 99); 26 Nov 2014 00:22:02 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Nov 2014 00:22:02 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DB640A1AEBC; Wed, 26 Nov 2014 00:22:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kasha@apache.org To: common-commits@hadoop.apache.org Message-Id: <3f28907ae2534a59bf8157bf7655e5fe@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-2188. [YARN-1492] Client service for cache manager. (Chris Trezzo and Sangjin Lee via kasha) Date: Wed, 26 Nov 2014 00:22:01 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/trunk 8a7ca13b1 -> fe1f2db5e YARN-2188. [YARN-1492] Client service for cache manager. (Chris Trezzo and Sangjin Lee via kasha) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fe1f2db5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fe1f2db5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fe1f2db5 Branch: refs/heads/trunk Commit: fe1f2db5ee13920925ee4728dfbbb48fe670ee14 Parents: 8a7ca13 Author: Karthik Kambatla Authored: Tue Nov 25 16:21:29 2014 -0800 Committer: Karthik Kambatla Committed: Tue Nov 25 16:21:46 2014 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 5 +- .../hadoop-yarn/hadoop-yarn-api/pom.xml | 1 + .../hadoop/yarn/api/ClientSCMProtocol.java | 90 ++++++ .../hadoop/yarn/api/ClientSCMProtocolPB.java | 28 ++ .../ReleaseSharedCacheResourceRequest.java | 67 +++++ .../ReleaseSharedCacheResourceResponse.java | 37 +++ .../UseSharedCacheResourceRequest.java | 70 +++++ .../UseSharedCacheResourceResponse.java | 55 ++++ .../hadoop/yarn/conf/YarnConfiguration.java | 12 + .../src/main/proto/client_SCM_protocol.proto | 30 ++ .../src/main/proto/yarn_service_protos.proto | 21 ++ .../client/ClientSCMProtocolPBClientImpl.java | 93 +++++++ .../service/ClientSCMProtocolPBServiceImpl.java | 78 ++++++ ...ReleaseSharedCacheResourceRequestPBImpl.java | 122 ++++++++ ...eleaseSharedCacheResourceResponsePBImpl.java | 53 ++++ .../pb/UseSharedCacheResourceRequestPBImpl.java | 120 ++++++++ .../UseSharedCacheResourceResponsePBImpl.java | 79 ++++++ .../src/main/resources/yarn-default.xml | 23 +- .../ClientProtocolService.java | 192 +++++++++++++ .../sharedcachemanager/SharedCacheManager.java | 7 + .../metrics/ClientSCMMetrics.java | 113 ++++++++ .../TestClientSCMProtocolService.java | 278 +++++++++++++++++++ 22 files changed, 1570 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 095bfc0..e9fa63e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -46,7 +46,10 @@ Release 2.7.0 - UNRELEASED (Chris Trezzo and Sangjin Lee via kasha) YARN-2236. [YARN-1492] Shared Cache uploader service on the Node - Manager. (Chris Trezzo and Sanjin Lee via kasha) + Manager. (Chris Trezzo and Sangjin Lee via kasha) + + YARN-2188. [YARN-1492] Client service for cache manager. + (Chris Trezzo and Sangjin Lee via kasha) IMPROVEMENTS http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml index affbe03..5e2278d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml @@ -96,6 +96,7 @@ server/resourcemanager_administration_protocol.proto application_history_client.proto server/application_history_server.proto + client_SCM_protocol.proto ${project.build.directory}/generated-sources/java http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java new file mode 100644 index 0000000..d63fa11 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocol.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + *

+ * The protocol between clients and the SharedCacheManager to claim + * and release resources in the shared cache. + *

+ */ +@Public +@Unstable +public interface ClientSCMProtocol { + /** + *

+ * The interface used by clients to claim a resource with the + * SharedCacheManager. The client uses a checksum to identify the + * resource and an {@link ApplicationId} to identify which application will be + * using the resource. + *

+ * + *

+ * The SharedCacheManager responds with whether or not the + * resource exists in the cache. If the resource exists, a Path + * to the resource in the shared cache is returned. If the resource does not + * exist, the response is empty. + *

+ * + * @param request request to claim a resource in the shared cache + * @return response indicating if the resource is already in the cache + * @throws YarnException + * @throws IOException + */ + public UseSharedCacheResourceResponse use( + UseSharedCacheResourceRequest request) throws YarnException, IOException; + + /** + *

+ * The interface used by clients to release a resource with the + * SharedCacheManager. This method is called once an application + * is no longer using a claimed resource in the shared cache. The client uses + * a checksum to identify the resource and an {@link ApplicationId} to + * identify which application is releasing the resource. + *

+ * + *

+ * Note: This method is an optimization and the client is not required to call + * it for correctness. + *

+ * + *

+ * Currently the SharedCacheManager sends an empty response. + *

+ * + * @param request request to release a resource in the shared cache + * @return (empty) response on releasing the resource + * @throws YarnException + * @throws IOException + */ + public ReleaseSharedCacheResourceResponse release( + ReleaseSharedCacheResourceRequest request) throws YarnException, IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java new file mode 100644 index 0000000..b0a9fb5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientSCMProtocolPB.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api; + +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.ClientSCMProtocol.ClientSCMProtocolService; + +@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ClientSCMProtocolPB", + protocolVersion = 1) +public interface ClientSCMProtocolPB extends + ClientSCMProtocolService.BlockingInterface { + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java new file mode 100644 index 0000000..a8d36b9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceRequest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + *

The request from clients to release a resource in the shared cache.

+ */ +@Public +@Unstable +public abstract class ReleaseSharedCacheResourceRequest { + + /** + * Get the ApplicationId of the resource to be released. + * + * @return ApplicationId + */ + @Public + @Unstable + public abstract ApplicationId getAppId(); + + /** + * Set the ApplicationId of the resource to be released. + * + * @param id ApplicationId + */ + @Public + @Unstable + public abstract void setAppId(ApplicationId id); + + /** + * Get the key of the resource to be released. + * + * @return key + */ + @Public + @Unstable + public abstract String getResourceKey(); + + /** + * Set the key of the resource to be released. + * + * @param key unique identifier for the resource + */ + @Public + @Unstable + public abstract void setResourceKey(String key); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java new file mode 100644 index 0000000..c075e74 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ReleaseSharedCacheResourceResponse.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

+ * The response to clients from the SharedCacheManager when + * releasing a resource in the shared cache. + *

+ * + *

+ * Currently, this is empty. + *

+ */ +@Public +@Unstable +public abstract class ReleaseSharedCacheResourceResponse { +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java new file mode 100644 index 0000000..bd42b7d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceRequest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + *

+ * The request from clients to the SharedCacheManager that claims a + * resource in the shared cache. + *

+ */ +@Public +@Unstable +public abstract class UseSharedCacheResourceRequest { + + /** + * Get the ApplicationId of the resource to be used. + * + * @return ApplicationId + */ + @Public + @Unstable + public abstract ApplicationId getAppId(); + + /** + * Set the ApplicationId of the resource to be used. + * + * @param id ApplicationId + */ + @Public + @Unstable + public abstract void setAppId(ApplicationId id); + + /** + * Get the key of the resource to be used. + * + * @return key + */ + @Public + @Unstable + public abstract String getResourceKey(); + + /** + * Set the key of the resource to be used. + * + * @param key unique identifier for the resource + */ + @Public + @Unstable + public abstract void setResourceKey(String key); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java new file mode 100644 index 0000000..87fb43b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UseSharedCacheResourceResponse.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

+ * The response from the SharedCacheManager to the client that indicates whether + * a requested resource exists in the cache. + *

+ */ +@Public +@Unstable +public abstract class UseSharedCacheResourceResponse { + + /** + * Get the Path corresponding to the requested resource in the + * shared cache. + * + * @return String A Path if the resource exists in the shared + * cache, null otherwise + */ + @Public + @Unstable + public abstract String getPath(); + + /** + * Set the Path corresponding to a resource in the shared cache. + * + * @param p A Path corresponding to a resource in the shared + * cache + */ + @Public + @Unstable + public abstract void setPath(String p); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 4b4f581..52bc821 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1472,6 +1472,18 @@ public class YarnConfiguration extends Configuration { SHARED_CACHE_PREFIX + "uploader.server.thread-count"; public static final int DEFAULT_SCM_UPLOADER_SERVER_THREAD_COUNT = 50; + /** The address of the client interface in the SCM. */ + public static final String SCM_CLIENT_SERVER_ADDRESS = + SHARED_CACHE_PREFIX + "client-server.address"; + public static final int DEFAULT_SCM_CLIENT_SERVER_PORT = 8045; + public static final String DEFAULT_SCM_CLIENT_SERVER_ADDRESS = "0.0.0.0:" + + DEFAULT_SCM_CLIENT_SERVER_PORT; + + /** The number of threads used to handle shared cache manager requests. */ + public static final String SCM_CLIENT_SERVER_THREAD_COUNT = + SHARED_CACHE_PREFIX + "client-server.thread-count"; + public static final int DEFAULT_SCM_CLIENT_SERVER_THREAD_COUNT = 50; + /** the checksum algorithm implementation **/ public static final String SHARED_CACHE_CHECKSUM_ALGO_IMPL = SHARED_CACHE_PREFIX + "checksum.algo.impl"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto new file mode 100644 index 0000000..fbc3c42 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/client_SCM_protocol.proto @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "ClientSCMProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_service_protos.proto"; + +service ClientSCMProtocolService { + rpc use (UseSharedCacheResourceRequestProto) returns (UseSharedCacheResourceResponseProto); + rpc release (ReleaseSharedCacheResourceRequestProto) returns (ReleaseSharedCacheResourceResponseProto); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 1bde69a..10f5b9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -307,6 +307,27 @@ message GetContainersResponseProto { } ////////////////////////////////////////////////////// +/////// client_SCM_Protocol ////////////////////////// +////////////////////////////////////////////////////// + +message UseSharedCacheResourceRequestProto { + optional ApplicationIdProto applicationId = 1; + optional string resourceKey = 2; +} + +message UseSharedCacheResourceResponseProto { + optional string path = 1; +} + +message ReleaseSharedCacheResourceRequestProto { + optional ApplicationIdProto applicationId = 1; + optional string resourceKey = 2; +} + +message ReleaseSharedCacheResourceResponseProto { +} + +////////////////////////////////////////////////////// // reservation_protocol ////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java new file mode 100644 index 0000000..79bfaca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientSCMProtocolPBClientImpl.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.impl.pb.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.ClientSCMProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto; + +import com.google.protobuf.ServiceException; + +public class ClientSCMProtocolPBClientImpl implements ClientSCMProtocol, + Closeable { + + private ClientSCMProtocolPB proxy; + + public ClientSCMProtocolPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, ClientSCMProtocolPB.class, + ProtobufRpcEngine.class); + proxy = RPC.getProxy(ClientSCMProtocolPB.class, clientVersion, addr, conf); + } + + @Override + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + this.proxy = null; + } + } + + @Override + public UseSharedCacheResourceResponse use( + UseSharedCacheResourceRequest request) throws YarnException, IOException { + UseSharedCacheResourceRequestProto requestProto = + ((UseSharedCacheResourceRequestPBImpl) request).getProto(); + try { + return new UseSharedCacheResourceResponsePBImpl(proxy.use(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public ReleaseSharedCacheResourceResponse release( + ReleaseSharedCacheResourceRequest request) throws YarnException, + IOException { + ReleaseSharedCacheResourceRequestProto requestProto = + ((ReleaseSharedCacheResourceRequestPBImpl) request).getProto(); + try { + return new ReleaseSharedCacheResourceResponsePBImpl(proxy.release(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java new file mode 100644 index 0000000..65b3581 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ClientSCMProtocolPBServiceImpl.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.impl.pb.service; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.ClientSCMProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReleaseSharedCacheResourceResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UseSharedCacheResourceResponsePBImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProto; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +public class ClientSCMProtocolPBServiceImpl implements ClientSCMProtocolPB { + + private ClientSCMProtocol real; + + public ClientSCMProtocolPBServiceImpl(ClientSCMProtocol impl) { + this.real = impl; + } + + @Override + public UseSharedCacheResourceResponseProto use(RpcController controller, + UseSharedCacheResourceRequestProto proto) throws ServiceException { + UseSharedCacheResourceRequestPBImpl request = + new UseSharedCacheResourceRequestPBImpl(proto); + try { + UseSharedCacheResourceResponse response = real.use(request); + return ((UseSharedCacheResourceResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ReleaseSharedCacheResourceResponseProto release( + RpcController controller, ReleaseSharedCacheResourceRequestProto proto) + throws ServiceException { + ReleaseSharedCacheResourceRequestPBImpl request = + new ReleaseSharedCacheResourceRequestPBImpl(proto); + try { + ReleaseSharedCacheResourceResponse response = real.release(request); + return ((ReleaseSharedCacheResourceResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java new file mode 100644 index 0000000..d16ef78 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceRequestPBImpl.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceRequestProtoOrBuilder; + +public class ReleaseSharedCacheResourceRequestPBImpl extends + ReleaseSharedCacheResourceRequest { + ReleaseSharedCacheResourceRequestProto proto = + ReleaseSharedCacheResourceRequestProto.getDefaultInstance(); + ReleaseSharedCacheResourceRequestProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId = null; + + public ReleaseSharedCacheResourceRequestPBImpl() { + builder = ReleaseSharedCacheResourceRequestProto.newBuilder(); + } + + public ReleaseSharedCacheResourceRequestPBImpl( + ReleaseSharedCacheResourceRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public ReleaseSharedCacheResourceRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public ApplicationId getAppId() { + ReleaseSharedCacheResourceRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (this.applicationId != null) { + return this.applicationId; + } + if (!p.hasApplicationId()) { + return null; + } + this.applicationId = convertFromProtoFormat(p.getApplicationId()); + return this.applicationId; + } + + @Override + public void setAppId(ApplicationId id) { + maybeInitBuilder(); + if (id == null) + builder.clearApplicationId(); + this.applicationId = id; + } + + @Override + public String getResourceKey() { + ReleaseSharedCacheResourceRequestProtoOrBuilder p = + viaProto ? proto : builder; + return (p.hasResourceKey()) ? p.getResourceKey() : null; + } + + @Override + public void setResourceKey(String key) { + maybeInitBuilder(); + if (key == null) { + builder.clearResourceKey(); + return; + } + builder.setResourceKey(key); + } + + private void mergeLocalToBuilder() { + if (applicationId != null) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ReleaseSharedCacheResourceRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java new file mode 100644 index 0000000..559f2c8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ReleaseSharedCacheResourceResponsePBImpl.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReleaseSharedCacheResourceResponseProto; + +public class ReleaseSharedCacheResourceResponsePBImpl extends + ReleaseSharedCacheResourceResponse { + ReleaseSharedCacheResourceResponseProto proto = + ReleaseSharedCacheResourceResponseProto.getDefaultInstance(); + ReleaseSharedCacheResourceResponseProto.Builder builder = null; + boolean viaProto = false; + + public ReleaseSharedCacheResourceResponsePBImpl() { + builder = ReleaseSharedCacheResourceResponseProto.newBuilder(); + } + + public ReleaseSharedCacheResourceResponsePBImpl( + ReleaseSharedCacheResourceResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public ReleaseSharedCacheResourceResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ReleaseSharedCacheResourceResponseProto.newBuilder(proto); + } + viaProto = false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java new file mode 100644 index 0000000..2a134b3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceRequestPBImpl.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceRequestProtoOrBuilder; + +public class UseSharedCacheResourceRequestPBImpl extends + UseSharedCacheResourceRequest { + UseSharedCacheResourceRequestProto proto = UseSharedCacheResourceRequestProto + .getDefaultInstance(); + UseSharedCacheResourceRequestProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId = null; + + public UseSharedCacheResourceRequestPBImpl() { + builder = UseSharedCacheResourceRequestProto.newBuilder(); + } + + public UseSharedCacheResourceRequestPBImpl( + UseSharedCacheResourceRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public UseSharedCacheResourceRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public ApplicationId getAppId() { + UseSharedCacheResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.applicationId != null) { + return this.applicationId; + } + if (!p.hasApplicationId()) { + return null; + } + this.applicationId = convertFromProtoFormat(p.getApplicationId()); + return this.applicationId; + } + + @Override + public void setAppId(ApplicationId id) { + maybeInitBuilder(); + if (id == null) + builder.clearApplicationId(); + this.applicationId = id; + } + + @Override + public String getResourceKey() { + UseSharedCacheResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasResourceKey()) ? p.getResourceKey() : null; + } + + @Override + public void setResourceKey(String key) { + maybeInitBuilder(); + if (key == null) { + builder.clearResourceKey(); + return; + } + builder.setResourceKey(key); + } + + private void mergeLocalToBuilder() { + if (applicationId != null) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UseSharedCacheResourceRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java new file mode 100644 index 0000000..0dd44c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UseSharedCacheResourceResponsePBImpl.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UseSharedCacheResourceResponseProtoOrBuilder; + +public class UseSharedCacheResourceResponsePBImpl extends + UseSharedCacheResourceResponse { + UseSharedCacheResourceResponseProto proto = + UseSharedCacheResourceResponseProto + .getDefaultInstance(); + UseSharedCacheResourceResponseProto.Builder builder = null; + boolean viaProto = false; + + public UseSharedCacheResourceResponsePBImpl() { + builder = UseSharedCacheResourceResponseProto.newBuilder(); + } + + public UseSharedCacheResourceResponsePBImpl( + UseSharedCacheResourceResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public UseSharedCacheResourceResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public String getPath() { + UseSharedCacheResourceResponseProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasPath()) ? p.getPath() : null; + } + + @Override + public void setPath(String path) { + maybeInitBuilder(); + if (path == null) { + builder.clearPath(); + return; + } + builder.setPath(path); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UseSharedCacheResourceResponseProto.newBuilder(proto); + } + viaProto = false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index af3b5aa..0ff989e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1459,19 +1459,36 @@ - The algorithm used to compute checksums of files (SHA-256 by default) + The address of the client interface in the SCM + (shared cache manager) + yarn.sharedcache.client-server.address + 0.0.0.0:8045 + + + + The number of threads used to handle shared cache manager + requests from clients (50 by default) + yarn.sharedcache.client-server.thread-count + 50 + + + + The algorithm used to compute checksums of files (SHA-256 by + default) yarn.sharedcache.checksum.algo.impl org.apache.hadoop.yarn.sharedcache.ChecksumSHA256Impl - The replication factor for the node manager uploader for the shared cache (10 by default) + The replication factor for the node manager uploader for the + shared cache (10 by default) yarn.sharedcache.nm.uploader.replication.factor 10 - The number of threads used to upload files from a node manager instance (20 by default) + The number of threads used to upload files from a node manager + instance (20 by default) yarn.sharedcache.nm.uploader.thread-count 20 http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java new file mode 100644 index 0000000..bd13573 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/ClientProtocolService.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.ClientSCMMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference; + +/** + * This service handles all rpc calls from the client to the shared cache + * manager. + */ +@Private +@Evolving +public class ClientProtocolService extends AbstractService implements + ClientSCMProtocol { + + private static final Log LOG = LogFactory.getLog(ClientProtocolService.class); + + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private Server server; + InetSocketAddress clientBindAddress; + private final SCMStore store; + private int cacheDepth; + private String cacheRoot; + private ClientSCMMetrics metrics; + + public ClientProtocolService(SCMStore store) { + super(ClientProtocolService.class.getName()); + this.store = store; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.clientBindAddress = getBindAddress(conf); + + this.cacheDepth = SharedCacheUtil.getCacheDepth(conf); + + this.cacheRoot = + conf.get(YarnConfiguration.SHARED_CACHE_ROOT, + YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); + + super.serviceInit(conf); + } + + InetSocketAddress getBindAddress(Configuration conf) { + return conf.getSocketAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_PORT); + } + + @Override + protected void serviceStart() throws Exception { + Configuration conf = getConfig(); + this.metrics = ClientSCMMetrics.initSingleton(conf); + + YarnRPC rpc = YarnRPC.create(conf); + this.server = + rpc.getServer(ClientSCMProtocol.class, this, + clientBindAddress, + conf, null, // Secret manager null for now (security not supported) + conf.getInt(YarnConfiguration.SCM_CLIENT_SERVER_THREAD_COUNT, + YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_THREAD_COUNT)); + + // TODO (YARN-2774): Enable service authorization + + this.server.start(); + clientBindAddress = + conf.updateConnectAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS, + server.getListenerAddress()); + + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.server != null) { + this.server.stop(); + } + + super.serviceStop(); + } + + @Override + public UseSharedCacheResourceResponse use( + UseSharedCacheResourceRequest request) throws YarnException, + IOException { + + UseSharedCacheResourceResponse response = + recordFactory.newRecordInstance(UseSharedCacheResourceResponse.class); + + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + throw RPCUtil.getRemoteException(ie); + } + + String fileName = + this.store.addResourceReference(request.getResourceKey(), + new SharedCacheResourceReference(request.getAppId(), + callerUGI.getShortUserName())); + + if (fileName != null) { + response + .setPath(getCacheEntryFilePath(request.getResourceKey(), fileName)); + this.metrics.incCacheHitCount(); + } else { + this.metrics.incCacheMissCount(); + } + + return response; + } + + @Override + public ReleaseSharedCacheResourceResponse release( + ReleaseSharedCacheResourceRequest request) throws YarnException, + IOException { + + ReleaseSharedCacheResourceResponse response = + recordFactory + .newRecordInstance(ReleaseSharedCacheResourceResponse.class); + + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + throw RPCUtil.getRemoteException(ie); + } + + boolean removed = + this.store.removeResourceReference( + request.getResourceKey(), + new SharedCacheResourceReference(request.getAppId(), callerUGI + .getShortUserName()), true); + + if (removed) { + this.metrics.incCacheRelease(); + } + + return response; + } + + private String getCacheEntryFilePath(String checksum, String filename) { + return SharedCacheUtil.getCacheEntryPath(this.cacheDepth, + this.cacheRoot, checksum) + Path.SEPARATOR_CHAR + filename; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java index ab50727..c54e470 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -71,6 +71,9 @@ public class SharedCacheManager extends CompositeService { createNMCacheUploaderSCMProtocolService(store); addService(nms); + ClientProtocolService cps = createClientProtocolService(store); + addService(cps); + // init metrics DefaultMetricsSystem.initialize("SharedCacheManager"); JvmMetrics.initSingleton("SharedCacheManager", null); @@ -106,6 +109,10 @@ public class SharedCacheManager extends CompositeService { return new SharedCacheUploaderService(store); } + private ClientProtocolService createClientProtocolService(SCMStore store) { + return new ClientProtocolService(store); + } + @Override protected void serviceStop() throws Exception { http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java new file mode 100644 index 0000000..3ae88e0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/metrics/ClientSCMMetrics.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.sharedcachemanager.metrics; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; + +/** + * This class is for maintaining client requests metrics + * and publishing them through the metrics interfaces. + */ +@Private +@Unstable +@Metrics(about="Client SCM metrics", context="yarn") +public class ClientSCMMetrics { + + private static final Log LOG = LogFactory.getLog(ClientSCMMetrics.class); + final MetricsRegistry registry; + + ClientSCMMetrics() { + registry = new MetricsRegistry("clientRequests"); + LOG.debug("Initialized " + registry); + } + + enum Singleton { + INSTANCE; + + ClientSCMMetrics impl; + + synchronized ClientSCMMetrics init(Configuration conf) { + if (impl == null) { + impl = create(); + } + return impl; + } + } + + public static ClientSCMMetrics initSingleton(Configuration conf) { + return Singleton.INSTANCE.init(conf); + } + + public static ClientSCMMetrics getInstance() { + ClientSCMMetrics topMetrics = Singleton.INSTANCE.impl; + if (topMetrics == null) { + throw new IllegalStateException( + "The ClientSCMMetrics singleton instance is not initialized." + + " Have you called init first?"); + } + return topMetrics; + } + + static ClientSCMMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + + ClientSCMMetrics metrics = new ClientSCMMetrics(); + ms.register("clientRequests", null, metrics); + return metrics; + } + + @Metric("Number of cache hits") MutableCounterLong cacheHits; + @Metric("Number of cache misses") MutableCounterLong cacheMisses; + @Metric("Number of cache releases") MutableCounterLong cacheReleases; + + /** + * One cache hit event + */ + public void incCacheHitCount() { + cacheHits.incr(); + } + + /** + * One cache miss event + */ + public void incCacheMissCount() { + cacheMisses.incr(); + } + + /** + * One cache release event + */ + public void incCacheRelease() { + cacheReleases.incr(); + } + + public long getCacheHits() { return cacheHits.value(); } + public long getCacheMisses() { return cacheMisses.value(); } + public long getCacheReleases() { return cacheReleases.value(); } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe1f2db5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java new file mode 100644 index 0000000..68f9851 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestClientSCMProtocolService.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ClientSCMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ReleaseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UseSharedCacheResourceRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.ClientSCMMetrics; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.InMemorySCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore; +import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + + +/** + * Basic unit tests for the Client to SCM Protocol Service. + */ +public class TestClientSCMProtocolService { + private static File testDir = null; + + @BeforeClass + public static void setupTestDirs() throws IOException { + testDir = new File("target", + TestSharedCacheUploaderService.class.getCanonicalName()); + testDir.delete(); + testDir.mkdirs(); + testDir = testDir.getAbsoluteFile(); + } + + @AfterClass + public static void cleanupTestDirs() throws IOException { + if (testDir != null) { + testDir.delete(); + } + } + + + private ClientProtocolService service; + private ClientSCMProtocol clientSCMProxy; + private SCMStore store; + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + @Before + public void startUp() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.SCM_STORE_CLASS, + InMemorySCMStore.class.getName()); + conf.set(YarnConfiguration.SHARED_CACHE_ROOT, testDir.getPath()); + AppChecker appChecker = mock(AppChecker.class); + store = new InMemorySCMStore(appChecker); + store.init(conf); + store.start(); + + service = new ClientProtocolService(store); + service.init(conf); + service.start(); + + YarnRPC rpc = YarnRPC.create(new Configuration()); + + InetSocketAddress scmAddress = + conf.getSocketAddr(YarnConfiguration.SCM_CLIENT_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_ADDRESS, + YarnConfiguration.DEFAULT_SCM_CLIENT_SERVER_PORT); + + clientSCMProxy = + (ClientSCMProtocol) rpc.getProxy(ClientSCMProtocol.class, scmAddress, + conf); + } + + @After + public void cleanUp() { + if (store != null) { + store.stop(); + store = null; + } + + if (service != null) { + service.stop(); + service = null; + } + + if (clientSCMProxy != null) { + RPC.stopProxy(clientSCMProxy); + clientSCMProxy = null; + } + } + + @Test + public void testUse_MissingEntry() throws Exception { + long misses = ClientSCMMetrics.getInstance().getCacheMisses(); + UseSharedCacheResourceRequest request = + recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(1, 1L)); + assertNull(clientSCMProxy.use(request).getPath()); + assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics + .getInstance().getCacheMisses() - misses); + } + + @Test + public void testUse_ExistingEntry_NoAppIds() throws Exception { + // Pre-populate the SCM with one cache entry + store.addResource("key1", "foo.jar"); + + long hits = ClientSCMMetrics.getInstance().getCacheHits(); + + UseSharedCacheResourceRequest request = + recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(2, 2L)); + // Expecting default depth of 3 and under the shared cache root dir + String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar"; + assertEquals(expectedPath, clientSCMProxy.use(request).getPath()); + assertEquals(1, store.getResourceReferences("key1").size()); + assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics + .getInstance().getCacheHits() - hits); + + } + + @Test + public void testUse_ExistingEntry_OneId() throws Exception { + // Pre-populate the SCM with one cache entry + store.addResource("key1", "foo.jar"); + store.addResourceReference("key1", + new SharedCacheResourceReference(createAppId(1, 1L), "user")); + assertEquals(1, store.getResourceReferences("key1").size()); + long hits = ClientSCMMetrics.getInstance().getCacheHits(); + + // Add a new distinct appId + UseSharedCacheResourceRequest request = + recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(2, 2L)); + + // Expecting default depth of 3 under the shared cache root dir + String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar"; + assertEquals(expectedPath, clientSCMProxy.use(request).getPath()); + assertEquals(2, store.getResourceReferences("key1").size()); + assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics + .getInstance().getCacheHits() - hits); + } + + @Test + public void testUse_ExistingEntry_DupId() throws Exception { + // Pre-populate the SCM with one cache entry + store.addResource("key1", "foo.jar"); + UserGroupInformation testUGI = UserGroupInformation.getCurrentUser(); + store.addResourceReference("key1", + new SharedCacheResourceReference(createAppId(1, 1L), + testUGI.getShortUserName())); + assertEquals(1, store.getResourceReferences("key1").size()); + + long hits = ClientSCMMetrics.getInstance().getCacheHits(); + + // Add a new duplicate appId + UseSharedCacheResourceRequest request = + recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(1, 1L)); + + // Expecting default depth of 3 under the shared cache root dir + String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar"; + assertEquals(expectedPath, clientSCMProxy.use(request).getPath()); + assertEquals(1, store.getResourceReferences("key1").size()); + + assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics + .getInstance().getCacheHits() - hits); + } + + @Test + public void testRelease_ExistingEntry_NonExistantAppId() throws Exception { + // Pre-populate the SCM with one cache entry + store.addResource("key1", "foo.jar"); + store.addResourceReference("key1", + new SharedCacheResourceReference(createAppId(1, 1L), "user")); + assertEquals(1, store.getResourceReferences("key1").size()); + + long releases = ClientSCMMetrics.getInstance().getCacheReleases(); + + ReleaseSharedCacheResourceRequest request = + recordFactory + .newRecordInstance(ReleaseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(2, 2L)); + clientSCMProxy.release(request); + assertEquals(1, store.getResourceReferences("key1").size()); + + assertEquals( + "Client SCM metrics were updated when a release did not happen", 0, + ClientSCMMetrics.getInstance().getCacheReleases() - releases); + + } + + @Test + public void testRelease_ExistingEntry_WithAppId() throws Exception { + // Pre-populate the SCM with one cache entry + store.addResource("key1", "foo.jar"); + UserGroupInformation testUGI = UserGroupInformation.getCurrentUser(); + store.addResourceReference("key1", + new SharedCacheResourceReference(createAppId(1, 1L), + testUGI.getShortUserName())); + assertEquals(1, store.getResourceReferences("key1").size()); + + long releases = ClientSCMMetrics.getInstance().getCacheReleases(); + + ReleaseSharedCacheResourceRequest request = + recordFactory + .newRecordInstance(ReleaseSharedCacheResourceRequest.class); + request.setResourceKey("key1"); + request.setAppId(createAppId(1, 1L)); + clientSCMProxy.release(request); + assertEquals(0, store.getResourceReferences("key1").size()); + + assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics + .getInstance().getCacheReleases() - releases); + + } + + @Test + public void testRelease_MissingEntry() throws Exception { + + long releases = ClientSCMMetrics.getInstance().getCacheReleases(); + + ReleaseSharedCacheResourceRequest request = + recordFactory + .newRecordInstance(ReleaseSharedCacheResourceRequest.class); + request.setResourceKey("key2"); + request.setAppId(createAppId(2, 2L)); + clientSCMProxy.release(request); + assertNotNull(store.getResourceReferences("key2")); + assertEquals(0, store.getResourceReferences("key2").size()); + assertEquals( + "Client SCM metrics were updated when a release did not happen.", 0, + ClientSCMMetrics.getInstance().getCacheReleases() - releases); + } + + private ApplicationId createAppId(int id, long timestamp) { + return ApplicationId.newInstance(timestamp, id); + } +}