openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dgr...@apache.org
Subject [openwhisk] branch master updated: remove experimental KubernetesClientWithInvokerAgent (#4785)
Date Fri, 03 Jan 2020 15:13:54 GMT
This is an automated email from the ASF dual-hosted git repository.

dgrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e7b6ec  remove experimental KubernetesClientWithInvokerAgent (#4785)
1e7b6ec is described below

commit 1e7b6ec845f20dec1dff0ce1649d09b1c9f131d1
Author: David Grove <dgrove-oss@users.noreply.github.com>
AuthorDate: Fri Jan 3 10:13:42 2020 -0500

    remove experimental KubernetesClientWithInvokerAgent (#4785)
    
    Remove experimental KubernetesClientWithInvokerAgent implementation of
    the ContainerFactory SPI to simplify the codebase.  This code was an
    exploration of implementing pause/unpause functionality when using the
    KubernetesContainerFactory, but was never implemented to a useful
    level of functionality.  Most seriously, if an invoker or invoker
    agent crashed it would leave orphaned suspended pods that Kubernetes
    was not able to cleanly remove without cluster admin-level
    intervention since they had been paused "behind the back" of the
    Kubernetes scheduler using low-level mechanisms.  A secondary
    consideration is that the actual invoker agent (go code in the
    deploy-kube project) was only implemented for the case when the
    underlying container engine was Docker. Given the subsequent shift
    away from Docker to containerd or cri-o, the invoker agent did not
    support recent versions of Kubernetes or OpenShift.
---
 core/invoker/src/main/resources/application.conf   |   4 -
 .../kubernetes/KubernetesClient.scala              |   6 -
 .../KubernetesClientWithInvokerAgent.scala         | 123 ---------------------
 .../kubernetes/KubernetesContainerFactory.scala    |   6 +-
 .../kubernetes/test/KubernetesClientTests.scala    |  26 -----
 5 files changed, 1 insertion(+), 164 deletions(-)

diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 0f59dd6..a998ecb 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -68,10 +68,6 @@ whisk {
       run: 1 minute
       logs: 1 minute
     }
-    invoker-agent {
-      enabled: false
-      port: 3233
-    }
     user-pod-node-affinity {
       enabled: true
       key: "openwhisk-role"
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 5cdaadd..c1a9357 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -58,11 +58,6 @@ import scala.util.{Failure, Success, Try}
 case class KubernetesClientTimeoutConfig(run: Duration, logs: Duration)
 
 /**
- * Configuration for kubernetes invoker-agent
- */
-case class KubernetesInvokerAgentConfig(enabled: Boolean, port: Int)
-
-/**
  * Configuration for node affinity for the pods that execute user action containers
  * The key,value pair should match the <key,value> pair with which the invoker worker
nodes
  * are labeled in the Kubernetes cluster.  The default pair is <openwhisk-role,invoker>,
@@ -74,7 +69,6 @@ case class KubernetesInvokerNodeAffinity(enabled: Boolean, key: String,
value: S
  * General configuration for kubernetes client
  */
 case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig,
-                                  invokerAgent: KubernetesInvokerAgentConfig,
                                   userPodNodeAffinity: KubernetesInvokerNodeAffinity,
                                   portForwardingEnabled: Boolean,
                                   actionNamespace: Option[String] = None,
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
deleted file mode 100644
index cd890bc..0000000
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.openwhisk.core.containerpool.kubernetes
-
-import org.apache.openwhisk.common.{Logging, TransactionId}
-import akka.actor.ActorSystem
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.model.Uri.Path
-import akka.http.scaladsl.model.{HttpRequest, HttpResponse, MessageEntity, Uri}
-import akka.http.scaladsl.marshalling.Marshal
-import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-import pureconfig._
-import pureconfig.generic.auto._
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-import org.apache.openwhisk.core.ConfigKeys
-
-import collection.JavaConverters._
-import scala.concurrent.{blocking, ExecutionContext, Future}
-
-/**
- * An extended kubernetes client that works in tandem with an invokerAgent DaemonSet with
- * instances running on every worker node that runs user containers to provide
- * suspend/resume capability.
- */
-class KubernetesClientWithInvokerAgent(config: KubernetesClientConfig =
-                                         loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes))(
-  executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem)
-    extends KubernetesClient(config)(executionContext)
-    with KubernetesApiWithInvokerAgent {
-
-  override def rm(key: String, value: String, ensureUnpaused: Boolean = false)(
-    implicit transid: TransactionId): Future[Unit] = {
-    if (ensureUnpaused) {
-      // The caller can't guarantee that every container with the label key=value is already
unpaused.
-      // Therefore we must enumerate them and ensure they are unpaused before we attempt
to delete them.
-      Future {
-        blocking {
-          kubeRestClient
-            .inNamespace(kubeRestClient.getNamespace)
-            .pods()
-            .withLabel(key, value)
-            .list()
-            .getItems
-            .asScala
-            .map { pod =>
-              val container = toContainer(pod)
-              container
-                .resume()
-                .recover { case _ => () } // Ignore errors; it is possible the container
was not actually suspended.
-                .map(_ => rm(container))
-            }
-        }
-      }.flatMap(futures =>
-        Future
-          .sequence(futures)
-          .map(_ => ()))
-    } else {
-      super.rm(key, value, ensureUnpaused)
-    }
-  }
-
-  override def suspend(container: KubernetesContainer)(implicit transid: TransactionId):
Future[Unit] = {
-    agentCommand("suspend", container)
-      .map(_.discardEntityBytes())
-  }
-
-  override def resume(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit]
= {
-    agentCommand("resume", container)
-      .map(_.discardEntityBytes())
-  }
-
-  override def agentCommand(command: String,
-                            container: KubernetesContainer,
-                            payload: Option[Map[String, JsValue]] = None): Future[HttpResponse]
= {
-    val uri = Uri()
-      .withScheme("http")
-      .withHost(container.workerIP)
-      .withPort(config.invokerAgent.port)
-      .withPath(Path / command / container.nativeContainerId)
-
-    Marshal(payload).to[MessageEntity].flatMap { entity =>
-      Http().singleRequest(HttpRequest(uri = uri, entity = entity))
-    }
-  }
-
-  private def fieldsString(fields: Map[String, JsValue]) =
-    fields
-      .map {
-        case (key, value) => s""""$key":${value.compactPrint}"""
-      }
-      .mkString(",")
-}
-
-trait KubernetesApiWithInvokerAgent extends KubernetesApi {
-
-  /**
-   * Request the invokerAgent running on the container's worker node to execute the given
command
-   * @param command The command verb to execute
-   * @param container The container to which the command should be applied
-   * @param payload The additional data needed to execute the command.
-   * @return The HTTPResponse from the remote agent.
-   */
-  def agentCommand(command: String,
-                   container: KubernetesContainer,
-                   payload: Option[Map[String, JsValue]] = None): Future[HttpResponse]
-
-}
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
index ece550c..a595c21 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
@@ -53,11 +53,7 @@ class KubernetesContainerFactory(
 
   private def initializeKubeClient(): KubernetesClient = {
     val config = loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes)
-    if (config.invokerAgent.enabled) {
-      new KubernetesClientWithInvokerAgent(config)(ec)
-    } else {
-      new KubernetesClient(config)(ec)
-    }
+    new KubernetesClient(config)(ec)
   }
 
   /** Perform cleanup on init */
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 0afaefa..310e571 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -20,7 +20,6 @@ package org.apache.openwhisk.core.containerpool.kubernetes.test
 import java.time.Instant
 
 import akka.actor.ActorSystem
-import akka.http.scaladsl.model.HttpResponse
 import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.{Concat, Sink, Source}
 
@@ -37,7 +36,6 @@ import org.scalatest.Matchers
 import org.scalatest.time.{Seconds, Span}
 import common.{StreamLogging, WskActorSystem}
 import okio.Buffer
-import spray.json.{JsObject, JsValue}
 import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.core.containerpool.{ContainerAddress, ContainerId}
 import org.apache.openwhisk.core.containerpool.kubernetes._
@@ -238,28 +236,4 @@ object KubernetesClientTests {
       Source(List.empty[TypedLogLine])
     }
   }
-
-  class TestKubernetesClientWithInvokerAgent(implicit as: ActorSystem)
-      extends TestKubernetesClient
-      with KubernetesApiWithInvokerAgent {
-    var agentCommands = mutable.Buffer.empty[(ContainerId, String, Option[Map[String, JsValue]])]
-    var forwardLogs = mutable.Buffer.empty[(ContainerId, Long)]
-
-    def agentCommand(command: String,
-                     container: KubernetesContainer,
-                     payload: Option[Map[String, JsValue]] = None): Future[HttpResponse]
= {
-      agentCommands += ((container.id, command, payload))
-      Future.successful(HttpResponse())
-    }
-
-    def forwardLogs(container: KubernetesContainer,
-                    lastOffset: Long,
-                    sizeLimit: ByteSize,
-                    sentinelledLogs: Boolean,
-                    additionalMetadata: Map[String, JsValue],
-                    augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Long]
= {
-      forwardLogs += ((container.id, lastOffset))
-      Future.successful(lastOffset + sizeLimit.toBytes) // for testing, pretend we read size
limit bytes
-    }
-  }
 }


Mime
View raw message