From reviews-return-652991-archive-asf-public=cust-asf.ponee.io@spark.apache.org Fri May 25 19:56:45 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 52700180627 for ; Fri, 25 May 2018 19:56:45 +0200 (CEST) Received: (qmail 23613 invoked by uid 500); 25 May 2018 17:56:44 -0000 Mailing-List: contact reviews-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list reviews@spark.apache.org Received: (qmail 23601 invoked by uid 99); 25 May 2018 17:56:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 May 2018 17:56:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A3C90E0A9F; Fri, 25 May 2018 17:56:43 +0000 (UTC) From: liyinan926 To: reviews@spark.apache.org Reply-To: reviews@spark.apache.org References: In-Reply-To: Subject: [GitHub] spark pull request #21366: [SPARK-24248][K8S] Use the Kubernetes API to popu... Content-Type: text/plain Message-Id: <20180525175643.A3C90E0A9F@git1-us-west.apache.org> Date: Fri, 25 May 2018 17:56:43 +0000 (UTC) Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190967916 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingEventSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ + +private[spark] class ExecutorPodsPollingEventSource( + conf: SparkConf, + kubernetesClient: KubernetesClient, + eventQueue: ExecutorPodsEventQueue, + pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { + require(pollingFuture == null, "Cannot start polling more than once.") + pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { + if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null + } + pollingExecutor.shutdown() --- End diff -- Oops. Yes, that's true. Let's have a utility method and do the following in the method: ``` executor.shutdown() executor.waitForTermination(30, TimeUnit.SECONDS) if (!executor.isShutdown()) { executor.shutdownNow() } ``` --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org For additional commands, e-mail: reviews-help@spark.apache.org