Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0A397200D34 for ; Fri, 3 Nov 2017 16:28:17 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 08D67160BFC; Fri, 3 Nov 2017 15:28:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F3D3A160BE9 for ; Fri, 3 Nov 2017 16:28:15 +0100 (CET) Received: (qmail 200 invoked by uid 500); 3 Nov 2017 15:28:15 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 191 invoked by uid 99); 3 Nov 2017 15:28:15 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Nov 2017 15:28:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 61B1B1808CA for ; Fri, 3 Nov 2017 15:28:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.002 X-Spam-Level: X-Spam-Status: No, score=-100.002 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id hPbXhQEoZ9Js for ; Fri, 3 Nov 2017 15:28:10 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 39F5B61177 for ; Fri, 3 Nov 2017 15:28:10 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id BCE6AE259D for ; Fri, 3 Nov 2017 15:28:08 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 0323E241B8 for ; Fri, 3 Nov 2017 15:28:07 +0000 (UTC) Date: Fri, 3 Nov 2017 15:28:07 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 03 Nov 2017 15:28:17 -0000 [ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237751#comment-16237751 ] ASF GitHub Bot commented on FLINK-7076: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148811288 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * General tests for the YARN resource manager component. + */ +public class YarnResourceManagerTest extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(YarnResourceManagerTest.class); + + private static Configuration flinkConfig = new Configuration(); + + private static Map env = new HashMap<>(); + + private static final Time timeout = Time.seconds(10L); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Before + public void setup() { + flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100); + File root = folder.getRoot(); + File home = new File(root, "home"); + boolean created = home.mkdir(); + assertTrue(created); + + env.put(ENV_APP_ID, "foo"); + env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath()); + env.put(ENV_CLIENT_SHIP_FILES, ""); + env.put(ENV_FLINK_CLASSPATH, ""); + env.put(ENV_HADOOP_USER_NAME, "foo"); + env.put(FLINK_JAR_PATH, root.toURI().toString()); + } + + @After + public void teardown() { + env.clear(); + } + + static class TestingYarnResourceManager extends YarnResourceManager { + public AMRMClientAsync mockResourceManagerClient; + public NMClient mockNMClient; + + public TestingYarnResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, + Map env, + ResourceManagerConfiguration resourceManagerConfiguration, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + SlotManager slotManager, + MetricRegistry metricRegistry, + JobLeaderIdService jobLeaderIdService, + FatalErrorHandler fatalErrorHandler, + AMRMClientAsync mockResourceManagerClient, + NMClient mockNMClient) { + super(rpcService, resourceManagerEndpointId, resourceId, flinkConfig, env, + resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, + slotManager, metricRegistry, jobLeaderIdService, fatalErrorHandler); + this.mockNMClient = mockNMClient; + this.mockResourceManagerClient = mockResourceManagerClient; + } + + public void runInMainThread(Runnable runnable) { + super.getMainThreadExecutor().execute(runnable); + } + + @Override + protected AMRMClientAsync createAndStartResourceManagerClient() { + return mockResourceManagerClient; + } + + @Override + protected NMClient createAndStartNodeManagerClient() { + return mockNMClient; + } + } + + static class Context { + + // services + TestingRpcService rpcService; + TestingFatalErrorHandler fatalErrorHandler; + MockResourceManagerRuntimeServices rmServices; + + // RM + ResourceManagerConfiguration rmConfiguration; + ResourceID rmResourceID; + static final String RM_ADDRESS = "resourceManager"; + TestingYarnResourceManager resourceManager; + + // domain objects for test purposes + final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 200); + + public ContainerId task = ContainerId.newInstance( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 0), 0), 1); + public String taskHost = "host1"; + + SlotReport slotReport = new SlotReport(); + + public NMClient mockNMClient = mock(NMClient.class); + public AMRMClientAsync mockResourceManagerClient = + mock(AMRMClientAsync.class); + + /** + * Create mock RM dependencies. + */ + Context() throws Exception { + rpcService = new TestingRpcService(); + fatalErrorHandler = new TestingFatalErrorHandler(); + rmServices = new MockResourceManagerRuntimeServices(); + + // resource manager + rmConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L)); + rmResourceID = ResourceID.generate(); + resourceManager = + new TestingYarnResourceManager( + rpcService, + RM_ADDRESS, + rmResourceID, + flinkConfig, + env, + rmConfiguration, + rmServices.highAvailabilityServices, + rmServices.heartbeatServices, + rmServices.slotManager, + rmServices.metricRegistry, + rmServices.jobLeaderIdService, + fatalErrorHandler, + mockResourceManagerClient, + mockNMClient + ); + } + + /** + * Mock services needed by the resource manager. + */ + class MockResourceManagerRuntimeServices { + + public final ScheduledExecutor scheduledExecutor; + public final TestingHighAvailabilityServices highAvailabilityServices; + public final HeartbeatServices heartbeatServices; + public final MetricRegistry metricRegistry; + public final TestingLeaderElectionService rmLeaderElectionService; + public final JobLeaderIdService jobLeaderIdService; + public final SlotManager slotManager; + + public UUID rmLeaderSessionId; + + MockResourceManagerRuntimeServices() throws Exception { + scheduledExecutor = mock(ScheduledExecutor.class); + highAvailabilityServices = new TestingHighAvailabilityServices(); + rmLeaderElectionService = new TestingLeaderElectionService(); + highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); + heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor); + metricRegistry = mock(MetricRegistry.class); --- End diff -- here we could use the `NoOpMetricRegistry`. > Implement container release to support dynamic scaling > ------------------------------------------------------ > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager > Reporter: Till Rohrmann > Assignee: Shuyi Chen > Priority: Major > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be able to dynamically free containers. We have to implement the {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)