openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chet...@apache.org
Subject [openwhisk] branch master updated: Pod template support with KubernetesContainerFactory (#4690)
Date Thu, 17 Oct 2019 05:17:08 GMT
This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bf79ef  Pod template support with KubernetesContainerFactory (#4690)
8bf79ef is described below

commit 8bf79efa0c5fa5f83a1bf60604f8c85abcd32ac2
Author: Chetan Mehrotra <chetanm@apache.org>
AuthorDate: Thu Oct 17 10:46:57 2019 +0530

    Pod template support with KubernetesContainerFactory (#4690)
    
    * Refactor existing pod builder logic to a separate class and add tests
    * Add support for extending template
    * Log the pod spec yaml if debug mode is on
    * Document pod template
    * Integration test for template support
    * Implement a config map value reader which can be passed literal string or file reference
    * Skip test run if KUBECONFIG is not defined
---
 common/scala/build.gradle                          |   2 +-
 .../apache/openwhisk/common/ConfigMapValue.scala   |  46 +++++++
 core/invoker/src/main/resources/application.conf   |   6 +
 .../kubernetes/KubernetesClient.scala              |  61 ++--------
 .../containerpool/kubernetes/WhiskPodBuilder.scala | 119 ++++++++++++++++++
 settings.gradle                                    |   3 +-
 tests/build.gradle                                 |   2 +
 .../openwhisk/common/ConfigMapValueTests.scala     |  70 +++++++++++
 .../kubernetes/test/KubeClientSupport.scala        |  60 +++++++++
 .../kubernetes/test/WhiskPodBuilderTests.scala     | 134 +++++++++++++++++++++
 .../openwhisk/standalone/StandaloneKCFTests.scala  |  50 ++++++--
 11 files changed, 491 insertions(+), 62 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 4a6bced..f446072 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -59,7 +59,7 @@ dependencies {
     compile ('com.fasterxml.uuid:java-uuid-generator:3.1.3')
     compile 'com.github.ben-manes.caffeine:caffeine:2.6.2'
     compile 'com.google.code.findbugs:jsr305:3.0.2'
-    compile 'io.fabric8:kubernetes-client:4.4.2'
+    compile "io.fabric8:kubernetes-client:${gradle.kube_client.version}"
     compile ('io.kamon:kamon-core_2.12:1.1.3') {
         exclude group: 'com.lihaoyi'
     }
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/ConfigMapValue.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/ConfigMapValue.scala
new file mode 100644
index 0000000..4ff7e7f
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/ConfigMapValue.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common
+
+import java.io.File
+import java.net.URI
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.commons.io.FileUtils
+import pureconfig.ConfigReader
+import pureconfig.ConvertHelpers.catchReadError
+
+class ConfigMapValue private (val value: String)
+
+object ConfigMapValue {
+
+  /**
+   * Checks if the value is a file url like `file:/etc/config/foo.yaml` then treat it as
a file reference
+   * and read its content otherwise consider it as a literal value
+   */
+  def apply(config: String): ConfigMapValue = {
+    val value = if (config.startsWith("file:")) {
+      val uri = new URI(config)
+      val file = new File(uri)
+      FileUtils.readFileToString(file, UTF_8)
+    } else config
+    new ConfigMapValue(value)
+  }
+
+  implicit val reader: ConfigReader[ConfigMapValue] = ConfigReader.fromString[ConfigMapValue](catchReadError(apply))
+}
diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf
index 9afba4d..b518756 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -81,6 +81,12 @@ whisk {
     # Enables forwarding to remote port via a local random port. This mode is mostly useful
     # for development via Standalone mode
     port-forwarding-enabled = false
+
+    # Pod template used as base for Action Pods created. It can be either
+    #  1. Reference to file `file:/path/to/template.yml`
+    #  2. OR yaml formatted multi line string. See multi line config support https://github.com/lightbend/config/blob/master/HOCON.md#multi-line-strings
+    #
+    #pod-template =
   }
 
   # Timeouts for runc commands. Set to "Inf" to disable timeout.
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
index 9f24938..87a9a59 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -30,10 +30,11 @@ import akka.stream.stage._
 import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
 import akka.util.ByteString
 import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.utils.Serialization
 import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
 import okhttp3.{Call, Callback, Request, Response}
 import okio.BufferedSource
-import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.common.{ConfigMapValue, Logging, TransactionId}
 import org.apache.openwhisk.core.ConfigKeys
 import org.apache.openwhisk.core.containerpool.docker.ProcessRunner
 import org.apache.openwhisk.core.containerpool.{ContainerAddress, ContainerId}
@@ -44,7 +45,6 @@ import spray.json.DefaultJsonProtocol._
 import spray.json._
 
 import scala.annotation.tailrec
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent.duration._
 import scala.concurrent.{blocking, ExecutionContext, Future}
@@ -75,7 +75,8 @@ case class KubernetesInvokerNodeAffinity(enabled: Boolean, key: String,
value: S
 case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig,
                                   invokerAgent: KubernetesInvokerAgentConfig,
                                   userPodNodeAffinity: KubernetesInvokerNodeAffinity,
-                                  portForwardingEnabled: Boolean)
+                                  portForwardingEnabled: Boolean,
+                                  podTemplate: Option[ConfigMapValue] = None)
 
 /**
  * Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking
the kubectl CLI.
@@ -98,62 +99,18 @@ class KubernetesClient(
       .withRequestTimeout(config.timeouts.logs.toMillis.toInt)
       .build())
 
+  private val podBuilder = new WhiskPodBuilder(kubeRestClient, config.userPodNodeAffinity,
config.podTemplate)
+
   def run(name: String,
           image: String,
           memory: ByteSize = 256.MB,
           environment: Map[String, String] = Map.empty,
           labels: Map[String, String] = Map.empty)(implicit transid: TransactionId): Future[KubernetesContainer]
= {
 
-    val envVars = environment.map {
-      case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build()
-    }.toSeq
-
-    val podBuilder = new PodBuilder()
-      .withNewMetadata()
-      .withName(name)
-      .addToLabels("name", name)
-      .addToLabels(labels.asJava)
-      .endMetadata()
-      .withNewSpec()
-      .withRestartPolicy("Always")
-    if (config.userPodNodeAffinity.enabled) {
-      val invokerNodeAffinity = new AffinityBuilder()
-        .withNewNodeAffinity()
-        .withNewRequiredDuringSchedulingIgnoredDuringExecution()
-        .addNewNodeSelectorTerm()
-        .addNewMatchExpression()
-        .withKey(config.userPodNodeAffinity.key)
-        .withOperator("In")
-        .withValues(config.userPodNodeAffinity.value)
-        .endMatchExpression()
-        .endNodeSelectorTerm()
-        .endRequiredDuringSchedulingIgnoredDuringExecution()
-        .endNodeAffinity()
-        .build()
-      podBuilder.withAffinity(invokerNodeAffinity)
+    val pod = podBuilder.buildPodSpec(name, image, memory, environment, labels)
+    if (transid.meta.extraLogging) {
+      log.info(this, s"Pod spec being created\n${Serialization.asYaml(pod)}")
     }
-    val secContext = new SecurityContextBuilder()
-      .withNewCapabilities()
-      .addToDrop("NET_RAW", "NET_ADMIN")
-      .endCapabilities()
-      .build()
-    val pod = podBuilder
-      .addNewContainer()
-      .withNewResources()
-      .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
-      .endResources()
-      .withSecurityContext(secContext)
-      .withName("user-action")
-      .withImage(image)
-      .withEnv(envVars.asJava)
-      .addNewPort()
-      .withContainerPort(8080)
-      .withName("action")
-      .endPort()
-      .endContainer()
-      .endSpec()
-      .build()
-
     val namespace = kubeRestClient.getNamespace
     kubeRestClient.pods.inNamespace(namespace).create(pod)
 
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
new file mode 100644
index 0000000..ca7627a
--- /dev/null
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.kubernetes
+
+import java.io.ByteArrayInputStream
+import java.nio.charset.StandardCharsets.UTF_8
+
+import io.fabric8.kubernetes.api.builder.Predicate
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, Pod, PodBuilder,
Quantity}
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient
+import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
+import org.apache.openwhisk.core.entity.ByteSize
+
+import scala.collection.JavaConverters._
+
+class WhiskPodBuilder(client: NamespacedKubernetesClient,
+                      userPodNodeAffinity: KubernetesInvokerNodeAffinity,
+                      podTemplate: Option[ConfigMapValue] = None) {
+  private val template = podTemplate.map(_.value.getBytes(UTF_8))
+  private val actionContainerName = "user-action"
+  private val actionContainerPredicate: Predicate[ContainerBuilder] = (cb) => cb.getName
== actionContainerName
+
+  def affinityEnabled: Boolean = userPodNodeAffinity.enabled
+
+  def buildPodSpec(name: String,
+                   image: String,
+                   memory: ByteSize,
+                   environment: Map[String, String],
+                   labels: Map[String, String])(implicit transid: TransactionId): Pod = {
+    val envVars = environment.map {
+      case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build()
+    }.toSeq
+
+    val baseBuilder = template match {
+      case Some(bytes) =>
+        new PodBuilder(loadPodSpec(bytes))
+      case None => new PodBuilder()
+    }
+
+    val pb1 = baseBuilder
+      .editOrNewMetadata()
+      .withName(name)
+      .addToLabels("name", name)
+      .addToLabels(labels.asJava)
+      .endMetadata()
+
+    val specBuilder = pb1.editOrNewSpec().withRestartPolicy("Always")
+
+    if (userPodNodeAffinity.enabled) {
+      val affinity = specBuilder
+        .editOrNewAffinity()
+        .editOrNewNodeAffinity()
+        .editOrNewRequiredDuringSchedulingIgnoredDuringExecution()
+      affinity
+        .addNewNodeSelectorTerm()
+        .addNewMatchExpression()
+        .withKey(userPodNodeAffinity.key)
+        .withOperator("In")
+        .withValues(userPodNodeAffinity.value)
+        .endMatchExpression()
+        .endNodeSelectorTerm()
+        .endRequiredDuringSchedulingIgnoredDuringExecution()
+        .endNodeAffinity()
+        .endAffinity()
+    }
+
+    val containerBuilder = if (specBuilder.hasMatchingContainer(actionContainerPredicate))
{
+      specBuilder.editMatchingContainer(actionContainerPredicate)
+    } else specBuilder.addNewContainer()
+
+    //In container its assumed that env, port, resource limits are set explicitly
+    //Here if any value exist in template then that would be overridden
+    containerBuilder
+      .withNewResources()
+      .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
+      .endResources()
+      .withName("user-action")
+      .withImage(image)
+      .withEnv(envVars.asJava)
+      .addNewPort()
+      .withContainerPort(8080)
+      .withName("action")
+      .endPort()
+
+    //If any existing context entry is present then "update" it else add new
+    containerBuilder
+      .editOrNewSecurityContext()
+      .editOrNewCapabilities()
+      .addToDrop("NET_RAW", "NET_ADMIN")
+      .endCapabilities()
+      .endSecurityContext()
+
+    val pod = containerBuilder
+      .endContainer()
+      .endSpec()
+      .build()
+    pod
+  }
+
+  private def loadPodSpec(bytes: Array[Byte]): Pod = {
+    val resources = client.load(new ByteArrayInputStream(bytes))
+    resources.get().get(0).asInstanceOf[Pod]
+  }
+}
diff --git a/settings.gradle b/settings.gradle
index 25764ac..480daf8 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -48,4 +48,5 @@ gradle.ext.akka = [version : '2.5.22']
 gradle.ext.akka_kafka = [version : '1.0.5']
 gradle.ext.akka_http = [version : '10.1.8']
 
-gradle.ext.curator = [version:'4.0.0']
+gradle.ext.curator = [version : '4.0.0']
+gradle.ext.kube_client = [version: '4.4.2']
diff --git a/tests/build.gradle b/tests/build.gradle
index ac27820..2b62ab5 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -201,6 +201,8 @@ dependencies {
     compile 'com.atlassian.oai:swagger-request-validator-core:1.4.5'
     compile "com.typesafe.akka:akka-stream-kafka-testkit_2.12:${gradle.akka_kafka.version}"
     compile "com.typesafe.akka:akka-stream-testkit_2.12:${gradle.akka.version}"
+    compile "com.typesafe.akka:akka-stream-testkit_2.12:${gradle.akka.version}"
+    compile "io.fabric8:kubernetes-server-mock:${gradle.kube_client.version}"
 
     compile "com.amazonaws:aws-java-sdk-s3:1.11.295"
 
diff --git a/tests/src/test/scala/org/apache/openwhisk/common/ConfigMapValueTests.scala b/tests/src/test/scala/org/apache/openwhisk/common/ConfigMapValueTests.scala
new file mode 100644
index 0000000..6067a28
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/common/ConfigMapValueTests.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common
+
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import com.typesafe.config.ConfigFactory
+import org.apache.commons.io.FileUtils
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import pureconfig.loadConfigOrThrow
+
+@RunWith(classOf[JUnitRunner])
+class ConfigMapValueTests extends FlatSpec with Matchers {
+  behavior of "ConfigMapValue"
+
+  case class ValueTest(template: ConfigMapValue, count: Int)
+
+  it should "read from string" in {
+    val config = ConfigFactory.parseString("""
+       |whisk {
+       |  value-test {
+       |    template = "test string"
+       |    count = 42
+       |  }
+       |}""".stripMargin)
+
+    val valueTest = readValueTest(config)
+    valueTest.template.value shouldBe "test string"
+  }
+
+  it should "read from file reference" in {
+    val file = Files.createTempFile("whisk", null).toFile
+    FileUtils.write(file, "test string", UTF_8)
+
+    val config = ConfigFactory.parseString(s"""
+       |whisk {
+       |  value-test {
+       |    template = "${file.toURI}"
+       |    count = 42
+       |  }
+       |}""".stripMargin)
+
+    val valueTest = readValueTest(config)
+    valueTest.template.value shouldBe "test string"
+
+    file.delete()
+  }
+
+  private def readValueTest(config: com.typesafe.config.Config) = {
+    loadConfigOrThrow[ValueTest](config.getConfig("whisk.value-test"))
+  }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubeClientSupport.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubeClientSupport.scala
new file mode 100644
index 0000000..0a64cd5
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubeClientSupport.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.kubernetes.test
+
+import common.StreamLogging
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer
+import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
+import org.scalatest.{BeforeAndAfterAll, Suite, TestSuite}
+
+import scala.concurrent.duration._
+
+trait KubeClientSupport extends TestSuite with BeforeAndAfterAll with StreamLogging {
+  self: Suite =>
+
+  protected def useMockServer = true
+
+  protected lazy val (kubeClient, closeable) = {
+    if (useMockServer) {
+      val server = new KubernetesMockServer(false)
+      server.init()
+      (server.createClient(), () => server.destroy())
+    } else {
+      val client = new DefaultKubernetesClient(
+        new ConfigBuilder()
+          .withConnectionTimeout(1.minute.toMillis.toInt)
+          .withRequestTimeout(1.minute.toMillis.toInt)
+          .build())
+      (client, () => client.close())
+    }
+  }
+
+  override def beforeAll(): Unit = {
+    if (!useMockServer) {
+      val kubeconfig = sys.env.get("KUBECONFIG")
+      assume(kubeconfig.isDefined, "KUBECONFIG env must be defined")
+      println(s"Using kubeconfig from ${kubeconfig.get}")
+    }
+    super.beforeAll()
+  }
+
+  override protected def afterAll(): Unit = {
+    super.afterAll()
+    closeable.apply()
+  }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
new file mode 100644
index 0000000..5c6eee2
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.kubernetes.test
+
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.utils.Serialization
+import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
+import org.apache.openwhisk.core.containerpool.kubernetes.{KubernetesInvokerNodeAffinity,
WhiskPodBuilder}
+import org.apache.openwhisk.core.entity.size._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[JUnitRunner])
+class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport {
+  implicit val tid: TransactionId = TransactionId.testing
+  private val testImage = "nodejs"
+  private val memLimit = 10.MB
+  private val name = "whisk"
+  private val affinity = KubernetesInvokerNodeAffinity(enabled = true, "openwhisk-role",
"invoker")
+
+  behavior of "WhiskPodBuilder"
+
+  it should "build a new pod" in {
+    val builder = new WhiskPodBuilder(kubeClient, affinity)
+    assertPodSettings(builder)
+  }
+
+  it should "extend existing pod template" in {
+    val template = """
+       |---
+       |apiVersion: "v1"
+       |kind: "Pod"
+       |metadata:
+       |  annotations:
+       |    my-foo : my-bar
+       |  labels:
+       |    my-fool : my-barv
+       |  name: "testpod"
+       |  namespace: whiskns
+       |spec:
+       |  containers:
+       |    - name: "user-action"
+       |      securityContext:
+       |        capabilities:
+       |          drop:
+       |          - "TEST_CAP"
+       |    - name: "sidecar"
+       |      image : "busybox"
+       |""".stripMargin
+
+    val builder = new WhiskPodBuilder(kubeClient, affinity.copy(enabled = false), Some(ConfigMapValue(template)))
+    val pod = assertPodSettings(builder)
+
+    val ac = getActionContainer(pod)
+    ac.getSecurityContext.getCapabilities.getDrop.asScala should contain("TEST_CAP")
+
+    val sc = pod.getSpec.getContainers.asScala.find(_.getName == "sidecar").get
+    sc.getImage shouldBe "busybox"
+
+    pod.getMetadata.getLabels.asScala.get("my-fool") shouldBe Some("my-barv")
+    pod.getMetadata.getAnnotations.asScala.get("my-foo") shouldBe Some("my-bar")
+    pod.getMetadata.getNamespace shouldBe "whiskns"
+  }
+
+  it should "extend existing pod template with affinity" in {
+    val template = """
+       |apiVersion: "v1"
+       |kind: "Pod"
+       |spec:
+       |  affinity:
+       |    nodeAffinity:
+       |      requiredDuringSchedulingIgnoredDuringExecution:
+       |        nodeSelectorTerms:
+       |        - matchExpressions:
+       |          - key: "nodelabel"
+       |            operator: "In"
+       |            values:
+       |            - "test"""".stripMargin
+
+    val builder = new WhiskPodBuilder(kubeClient, affinity.copy(enabled = true), Some(ConfigMapValue(template)))
+    val pod = assertPodSettings(builder)
+
+    val terms =
+      pod.getSpec.getAffinity.getNodeAffinity.getRequiredDuringSchedulingIgnoredDuringExecution.getNodeSelectorTerms.asScala
+    terms.exists(_.getMatchExpressions.asScala.exists(_.getKey == "nodelabel")) shouldBe
true
+  }
+
+  private def assertPodSettings(builder: WhiskPodBuilder): Pod = {
+    val pod = builder.buildPodSpec(name, testImage, memLimit, Map("foo" -> "bar"), Map("fooL"
-> "barV"))
+    withClue(Serialization.asYaml(pod)) {
+      val c = getActionContainer(pod)
+      c.getEnv.asScala.exists(_.getName == "foo") shouldBe true
+
+      c.getResources.getLimits.asScala.get("memory").map(_.getAmount) shouldBe Some("10Mi")
+      c.getSecurityContext.getCapabilities.getDrop.asScala should contain allOf ("NET_RAW",
"NET_ADMIN")
+      c.getPorts.asScala.find(_.getName == "action").map(_.getContainerPort) shouldBe Some(8080)
+      c.getImage shouldBe testImage
+
+      pod.getMetadata.getLabels.asScala.get("name") shouldBe Some(name)
+      pod.getMetadata.getLabels.asScala.get("fooL") shouldBe Some("barV")
+      pod.getMetadata.getName shouldBe name
+      pod.getSpec.getRestartPolicy shouldBe "Always"
+
+      if (builder.affinityEnabled) {
+        val terms =
+          pod.getSpec.getAffinity.getNodeAffinity.getRequiredDuringSchedulingIgnoredDuringExecution.getNodeSelectorTerms.asScala
+        terms.exists(_.getMatchExpressions.asScala.exists(_.getKey == affinity.key)) shouldBe
true
+      }
+    }
+    pod
+  }
+
+  private def getActionContainer(pod: Pod) = {
+    pod.getSpec.getContainers.asScala.find(_.getName == "user-action").get
+  }
+}
diff --git a/tests/src/test/scala/org/apache/openwhisk/standalone/StandaloneKCFTests.scala
b/tests/src/test/scala/org/apache/openwhisk/standalone/StandaloneKCFTests.scala
index acf4106..bb5a0c2 100644
--- a/tests/src/test/scala/org/apache/openwhisk/standalone/StandaloneKCFTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/standalone/StandaloneKCFTests.scala
@@ -17,13 +17,22 @@
 
 package org.apache.openwhisk.standalone
 
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
 import common.WskProps
+import org.apache.commons.io.FileUtils
+import org.apache.openwhisk.core.containerpool.kubernetes.test.KubeClientSupport
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import system.basic.WskRestBasicTests
 
 @RunWith(classOf[JUnitRunner])
-class StandaloneKCFTests extends WskRestBasicTests with StandaloneServerFixture with StandaloneSanityTestSupport
{
+class StandaloneKCFTests
+    extends WskRestBasicTests
+    with StandaloneServerFixture
+    with StandaloneSanityTestSupport
+    with KubeClientSupport {
   override implicit val wskprops = WskProps().copy(apihost = serverUrl)
 
   //Turn on to debug locally easily
@@ -31,17 +40,42 @@ class StandaloneKCFTests extends WskRestBasicTests with StandaloneServerFixture
 
   override protected val dumpStartupLogs = false
 
+  override protected def useMockServer = false
+
   override protected def supportedTests = Set("Wsk Action REST should invoke a blocking action
and get only the result")
 
   override protected def extraArgs: Seq[String] = Seq("--dev-mode", "--dev-kcf")
 
-  override def beforeAll(): Unit = {
-    val kubeconfig = sys.env.get("KUBECONFIG")
-    require(kubeconfig.isDefined, "KUBECONFIG env must be defined")
-    println(s"Using kubeconfig from ${kubeconfig.get}")
+  private val podTemplate = """---
+                              |apiVersion: "v1"
+                              |kind: "Pod"
+                              |metadata:
+                              |  annotations:
+                              |    allow-outbound : "true"
+                              |  labels:
+                              |     launcher: standalone""".stripMargin
+
+  private val podTemplateFile = Files.createTempFile("whisk", null).toFile
+
+  override val customConfig = {
+    FileUtils.write(podTemplateFile, podTemplate, UTF_8)
+    Some(s"""include classpath("standalone-kcf.conf")
+         |
+         |whisk {
+         |  kubernetes {
+         |    pod-template = "${podTemplateFile.toURI}"
+         |  }
+         |}""".stripMargin)
+  }
+
+  override def afterAll(): Unit = {
+    checkPodState()
+    super.afterAll()
+    podTemplateFile.delete()
+  }
 
-    //Note the context need to specify default namespace
-    //kubectl config set-context --current --namespace=default
-    super.beforeAll()
+  def checkPodState(): Unit = {
+    val podList = kubeClient.pods().withLabel("launcher").list()
+    podList.getItems.isEmpty shouldBe false
   }
 }


Mime
View raw message