Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1B475200D40 for ; Fri, 3 Nov 2017 16:27:57 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1A048160BE9; Fri, 3 Nov 2017 15:27:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 10BD2160BFC for ; Fri, 3 Nov 2017 16:27:55 +0100 (CET) Received: (qmail 96595 invoked by uid 500); 3 Nov 2017 15:27:55 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 96576 invoked by uid 99); 3 Nov 2017 15:27:55 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Nov 2017 15:27:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 14D321807D1 for ; Fri, 3 Nov 2017 15:27:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.021 X-Spam-Level: X-Spam-Status: No, score=-4.021 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id HjPL7VeIrBlE for ; Fri, 3 Nov 2017 15:27:51 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 0119D60F5E for ; Fri, 3 Nov 2017 15:27:49 +0000 (UTC) Received: (qmail 94746 invoked by uid 99); 3 Nov 2017 15:27:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Nov 2017 15:27:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2537AE004D; Fri, 3 Nov 2017 15:27:48 +0000 (UTC) From: tillrohrmann To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #4729: [FLINK-7076] [ResourceManager] implement YARN stop... Content-Type: text/plain Message-Id: <20171103152748.2537AE004D@git1-us-west.apache.org> Date: Fri, 3 Nov 2017 15:27:48 +0000 (UTC) archived-at: Fri, 03 Nov 2017 15:27:57 -0000 Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4729#discussion_r148812650 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; + +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * General tests for the YARN resource manager component. + */ +public class YarnResourceManagerTest extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(YarnResourceManagerTest.class); + + private static Configuration flinkConfig = new Configuration(); + + private static Map env = new HashMap<>(); + + private static final Time timeout = Time.seconds(10L); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Before + public void setup() { + flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100); + File root = folder.getRoot(); + File home = new File(root, "home"); + boolean created = home.mkdir(); + assertTrue(created); + + env.put(ENV_APP_ID, "foo"); + env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath()); + env.put(ENV_CLIENT_SHIP_FILES, ""); + env.put(ENV_FLINK_CLASSPATH, ""); + env.put(ENV_HADOOP_USER_NAME, "foo"); + env.put(FLINK_JAR_PATH, root.toURI().toString()); + } + + @After + public void teardown() { + env.clear(); + } + + static class TestingYarnResourceManager extends YarnResourceManager { + public AMRMClientAsync mockResourceManagerClient; + public NMClient mockNMClient; + + public TestingYarnResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, + Map env, + ResourceManagerConfiguration resourceManagerConfiguration, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + SlotManager slotManager, + MetricRegistry metricRegistry, + JobLeaderIdService jobLeaderIdService, + FatalErrorHandler fatalErrorHandler, + AMRMClientAsync mockResourceManagerClient, + NMClient mockNMClient) { + super(rpcService, resourceManagerEndpointId, resourceId, flinkConfig, env, + resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, + slotManager, metricRegistry, jobLeaderIdService, fatalErrorHandler); + this.mockNMClient = mockNMClient; + this.mockResourceManagerClient = mockResourceManagerClient; + } + + public void runInMainThread(Runnable runnable) { + super.getMainThreadExecutor().execute(runnable); + } + + @Override + protected AMRMClientAsync createAndStartResourceManagerClient() { + return mockResourceManagerClient; + } + + @Override + protected NMClient createAndStartNodeManagerClient() { + return mockNMClient; + } + } + + static class Context { + + // services + TestingRpcService rpcService; + TestingFatalErrorHandler fatalErrorHandler; + MockResourceManagerRuntimeServices rmServices; + + // RM + ResourceManagerConfiguration rmConfiguration; + ResourceID rmResourceID; + static final String RM_ADDRESS = "resourceManager"; + TestingYarnResourceManager resourceManager; + + // domain objects for test purposes + final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 200); + + public ContainerId task = ContainerId.newInstance( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 0), 0), 1); + public String taskHost = "host1"; + + SlotReport slotReport = new SlotReport(); + + public NMClient mockNMClient = mock(NMClient.class); + public AMRMClientAsync mockResourceManagerClient = + mock(AMRMClientAsync.class); + + /** + * Create mock RM dependencies. + */ + Context() throws Exception { + rpcService = new TestingRpcService(); + fatalErrorHandler = new TestingFatalErrorHandler(); + rmServices = new MockResourceManagerRuntimeServices(); + + // resource manager + rmConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L)); + rmResourceID = ResourceID.generate(); + resourceManager = + new TestingYarnResourceManager( + rpcService, + RM_ADDRESS, + rmResourceID, + flinkConfig, + env, + rmConfiguration, + rmServices.highAvailabilityServices, + rmServices.heartbeatServices, + rmServices.slotManager, + rmServices.metricRegistry, + rmServices.jobLeaderIdService, + fatalErrorHandler, + mockResourceManagerClient, + mockNMClient + ); + } + + /** + * Mock services needed by the resource manager. + */ + class MockResourceManagerRuntimeServices { + + public final ScheduledExecutor scheduledExecutor; + public final TestingHighAvailabilityServices highAvailabilityServices; + public final HeartbeatServices heartbeatServices; + public final MetricRegistry metricRegistry; + public final TestingLeaderElectionService rmLeaderElectionService; + public final JobLeaderIdService jobLeaderIdService; + public final SlotManager slotManager; + + public UUID rmLeaderSessionId; + + MockResourceManagerRuntimeServices() throws Exception { + scheduledExecutor = mock(ScheduledExecutor.class); + highAvailabilityServices = new TestingHighAvailabilityServices(); + rmLeaderElectionService = new TestingLeaderElectionService(); + highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); + heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor); + metricRegistry = mock(MetricRegistry.class); + slotManager = new SlotManager( + new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()), + Time.seconds(10), Time.seconds(10), Time.minutes(1)); + jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + rpcService.getScheduledExecutor(), + Time.minutes(5L)); + } + + public void grantLeadership() throws Exception { + rmLeaderSessionId = UUID.randomUUID(); + rmLeaderElectionService.isLeader(rmLeaderSessionId).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + } + + /** + * Start the resource manager and grant leadership to it. + */ + public void startResourceManager() throws Exception { + resourceManager.start(); + rmServices.grantLeadership(); + } + + /** + * Stop the Akka actor system. + */ + public void stopResourceManager() throws Exception { + rpcService.stopService(); + } + } + + static class TestContainer extends UtilsTest.TestingContainer { + Resource resource; + Priority priority; + + TestContainer(String id, String host) { + super(id, host); + } + + @Override + public Resource getResource() { + return resource; + } + + @Override + public void setResource(Resource resource) { + this.resource = resource; + } + + @Override + public Priority getPriority() { + return priority; + } + + @Override + public void setPriority(Priority priority) { + this.priority = priority; + } + } + + @Test + public void testStopWorker() throws Exception { + new Context() {{ + startResourceManager(); + // Request slot from SlotManager. + resourceManager.runInMainThread(() -> { + try { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + } catch (SlotManagerException e) { + e.printStackTrace(); + } + }); + + // Callback from YARN when container is allocated. + Container testingContainer = new TestContainer(task.toString(), taskHost); + testingContainer.setResource(Resource.newInstance(200, 1)); + testingContainer.setPriority(Priority.UNDEFINED); + resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); + + // Remote task executor registers with YarnResourceManager. + TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class); + rpcService.registerGateway(taskHost, mockTaskExecutorGateway); + resourceManager.registerTaskExecutor(taskHost, + new ResourceID(testingContainer.getId().toString()), + slotReport, + Time.seconds(10)).handleAsync( + (RegistrationResponse response, Throwable throwable) -> { + assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 1); --- End diff -- Maybe we should run it in the main thread since we are accessing the RM's internal state. ---