openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From markusthoem...@apache.org
Subject [incubator-openwhisk] branch master updated: Remove play dependence. (#2438)
Date Fri, 14 Jul 2017 07:43:31 GMT
This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new aace138  Remove play dependence. (#2438)
aace138 is described below

commit aace138bb08c49b12ce64ba9dac175f3939c7cf5
Author: rodric rabbah <rodric@gmail.com>
AuthorDate: Fri Jul 14 03:43:28 2017 -0400

    Remove play dependence. (#2438)
---
 .../scala/whisk/core/entity/ActivationResult.scala |  18 +++-
 .../scala/whisk/core/container/HttpUtils.scala     |  27 ++++-
 .../whisk/core/container/WhiskContainer.scala      |   2 +-
 .../containerpool/docker/DockerContainer.scala     |   7 +-
 tests/build.gradle                                 |   1 -
 .../scala/actionContainers/ActionContainer.scala   |  23 +---
 tests/src/test/scala/common/AkkaHttpUtils.scala    | 116 ---------------------
 7 files changed, 44 insertions(+), 150 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
index 6d78dad..aef644e 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
@@ -19,6 +19,7 @@ package whisk.core.entity
 
 import scala.util.Try
 
+import spray.http.StatusCodes.OK
 import spray.json._
 import spray.json.DefaultJsonProtocol
 import whisk.common.Logging
@@ -96,12 +97,15 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol
{
     protected[core] case class ConnectionError(t: Throwable) extends ContainerConnectionError
     protected[core] case class NoResponseReceived() extends ContainerConnectionError
     protected[core] case class Timeout() extends ContainerConnectionError
+
     /**
-     * @param okStatus the container response was OK (HTTP 200 status code), anything else
is considered an error
+     * @param statusCode the container HTTP response code (e.g., 200 OK)
      * @param entity the entity response as string
      * @param truncated either None to indicate complete entity or Some(actual length, max
allowed)
      */
-    protected[core] case class ContainerResponse(okStatus: Boolean, entity: String, truncated:
Option[(ByteSize, ByteSize)] = None) {
+    protected[core] case class ContainerResponse(statusCode: Int, entity: String, truncated:
Option[(ByteSize, ByteSize)]) {
+        /** true iff status code is OK (HTTP 200 status code), anything else is considered
an error. **/
+        val okStatus = statusCode == OK.intValue
         val ok = okStatus && truncated.isEmpty
         override def toString = {
             val base = if (okStatus) "ok" else "not ok"
@@ -110,6 +114,12 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol
{
         }
     }
 
+    protected[core] object ContainerResponse {
+        def apply(okStatus: Boolean, entity: String, truncated: Option[(ByteSize, ByteSize)]
= None): ContainerResponse = {
+            ContainerResponse(if (okStatus) OK.intValue else 500, entity, truncated)
+        }
+    }
+
     /**
      * Interprets response from container after initialization. This method is only called
when the initialization failed.
      *
@@ -149,14 +159,14 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol
{
      */
     protected[core] def processRunResponseContent(response: Either[ContainerConnectionError,
ContainerResponse], logger: Logging): ActivationResponse = {
         response match {
-            case Right(ContainerResponse(okStatus, str, truncated)) => truncated match
{
+            case Right(res @ ContainerResponse(_, str, truncated)) => truncated match
{
                 case None =>
                     Try { str.parseJson.asJsObject } match {
                         case scala.util.Success(result @ JsObject(fields)) =>
                             // If the response is a JSON object container an error field,
accept it as the response error.
                             val errorOpt = fields.get(ERROR_FIELD)
 
-                            if (okStatus) {
+                            if (res.okStatus) {
                                 errorOpt map { error =>
                                     applicationError(error)
                                 } getOrElse {
diff --git a/core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala b/core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala
index e53ddb2..1530d69 100644
--- a/core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala
+++ b/core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala
@@ -20,6 +20,7 @@ package whisk.core.container
 import java.nio.charset.StandardCharsets
 
 import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration.DurationInt
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
@@ -34,7 +35,8 @@ import org.apache.http.conn.HttpHostConnectException
 import org.apache.http.entity.StringEntity
 import org.apache.http.impl.client.HttpClientBuilder
 
-import spray.json.JsObject
+import spray.json._
+
 import whisk.core.entity.ActivationResponse._
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size.SizeLong
@@ -56,7 +58,7 @@ protected[core] class HttpUtils(
     timeout: FiniteDuration,
     maxResponse: ByteSize) {
 
-    def close = Try(connection.close)
+    def close() = Try(connection.close())
 
     /**
      * Posts to hostname/endpoint the given JSON object.
@@ -66,11 +68,11 @@ protected[core] class HttpUtils(
      * wait longer than the total timeout (within a small slack allowance).
      *
      * @param endpoint the path the api call relative to hostname
-     * @param body the json object to post
+     * @param body the JSON value to post (this is usually a JSON objecT)
      * @param retry whether or not to retry on connection failure
      * @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
      */
-    def post(endpoint: String, body: JsObject, retry: Boolean): Either[ContainerConnectionError,
ContainerResponse] = {
+    def post(endpoint: String, body: JsValue, retry: Boolean): Either[ContainerConnectionError,
ContainerResponse] = {
         val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8)
         entity.setContentType("application/json")
 
@@ -92,7 +94,7 @@ protected[core] class HttpUtils(
                     val bytes = IOUtils.toByteArray(entity.getContent, bytesToRead)
                     val str = new String(bytes, StandardCharsets.UTF_8)
                     val truncated = if (contentLength <= maxResponseBytes) None else Some(contentLength.B,
maxResponse)
-                    Right(ContainerResponse(statusCode == 200, str, truncated))
+                    Right(ContainerResponse(statusCode, str, truncated))
                 } else {
                     Left(NoResponseReceived())
                 }
@@ -135,3 +137,18 @@ protected[core] class HttpUtils(
         .useSystemProperties()
         .build
 }
+
+object HttpUtils {
+    /** A helper method to post one single request to a connection. Used for container tests.
*/
+    def post(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject])
= {
+        val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB)
+        val response = connection.post(endPoint, content, retry = true)
+        connection.close()
+        response match {
+            case Right(r) => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption)
+            case Left(Timeout()) => throw new java.util.concurrent.TimeoutException()
+            case Left(ConnectionError(t: java.net.SocketTimeoutException)) => throw new
java.util.concurrent.TimeoutException()
+            case _ => throw new IllegalStateException()
+        }
+    }
+}
diff --git a/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala b/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala
index 4685693..49a4639 100644
--- a/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala
@@ -120,7 +120,7 @@ class WhiskContainer(
      * Tear down the container and retrieve the logs.
      */
     def teardown()(implicit transid: TransactionId): String = {
-        connection.foreach(_.close)
+        connection.foreach(_.close())
         getContainerLogs(containerName).toOption.getOrElse("none")
     }
 
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 4310f91..869c227 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -76,7 +76,7 @@ object DockerContainer {
             case (key, value) => Seq("-e", s"$key=$value")
         }.flatten
 
-        val dnsArgs = dnsServers.map(Seq("--dns",_)).flatten
+        val dnsArgs = dnsServers.map(Seq("--dns", _)).flatten
 
         val args = Seq(
             "--cap-drop", "NET_RAW",
@@ -137,7 +137,10 @@ class DockerContainer(id: ContainerId, ip: ContainerIp)(
 
     def suspend()(implicit transid: TransactionId): Future[Unit] = runc.pause(id)
     def resume()(implicit transid: TransactionId): Future[Unit] = runc.resume(id)
-    def destroy()(implicit transid: TransactionId): Future[Unit] = docker.rm(id)
+    def destroy()(implicit transid: TransactionId): Future[Unit] = {
+        httpConnection.foreach(_.close())
+        docker.rm(id)
+    }
 
     def initialize(initializer: JsObject, timeout: FiniteDuration)(implicit transid: TransactionId):
Future[Interval] = {
         val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_INIT, s"sending
initialization to $id $ip")
diff --git a/tests/build.gradle b/tests/build.gradle
index 3a07e70..b33e2c6 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -52,7 +52,6 @@ dependencies {
     compile 'com.typesafe.akka:akka-testkit_2.11:2.4.16'
     compile 'com.google.code.gson:gson:2.3.1'
     compile 'org.scalamock:scalamock-scalatest-support_2.11:3.4.2'
-    compile 'com.typesafe.play:play-ws_2.11:2.5.11'
 
     compile project(':common:scala')
     compile project(':core:controller')
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala
index 978114d..e10c792 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -31,7 +31,6 @@ import scala.language.postfixOps
 import scala.sys.process.ProcessLogger
 import scala.sys.process.stringToProcess
 import scala.util.Random
-import scala.util.Try
 
 import org.apache.commons.lang3.StringUtils
 import org.scalatest.FlatSpec
@@ -159,26 +158,8 @@ object ActionContainer {
         }
     }
 
-    private def syncPost(host: String, port: Int, endPoint: String, content: JsValue)(
-        implicit actorSystem: ActorSystem): (Int, Option[JsObject]) = {
-        import akka.http.scaladsl.model._
-        import akka.http.scaladsl.unmarshalling._
-        import akka.stream.ActorMaterializer
-        import common.AkkaHttpUtils
-
-        implicit val materializer = ActorMaterializer()
-
-        val uri = Uri(
-            scheme = "http",
-            authority = Uri.Authority(host = Uri.Host(host), port = port),
-            path = Uri.Path(endPoint))
-
-        val f = for (
-            response <- AkkaHttpUtils.singleRequest(uri.toString(), content, 90.seconds,
retryOnTCPErrors = true);
-            responseBody <- Unmarshal(response.body).to[String]
-        ) yield (response.status.intValue, Try(responseBody.parseJson.asJsObject).toOption)
-
-        Await.result(f, 90.seconds)
+    private def syncPost(host: String, port: Int, endPoint: String, content: JsValue): (Int,
Option[JsObject]) = {
+        whisk.core.container.HttpUtils.post(host, port, endPoint, content)
     }
 
     private class ActionContainerImpl() extends ActionContainer {
diff --git a/tests/src/test/scala/common/AkkaHttpUtils.scala b/tests/src/test/scala/common/AkkaHttpUtils.scala
deleted file mode 100644
index 46faa69..0000000
--- a/tests/src/test/scala/common/AkkaHttpUtils.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package common
-
-import java.util.concurrent.TimeoutException
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.model._
-import akka.stream.ActorMaterializer
-import play.api.http.{ContentTypeOf, Writeable}
-import play.api.libs.ws.WSResponse
-import play.api.libs.ws.ahc.AhcWSClient
-import play.api.mvc.Codec
-import spray.json.JsValue
-
-import scala.concurrent.{Await, Future, Promise}
-import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
-import scala.util.Try
-
-object AkkaHttpUtils {
-
-    // Writable for spray-json is required
-    implicit def sprayJsonContentType(implicit codec: Codec): ContentTypeOf[JsValue] = {
-        ContentTypeOf[JsValue](Some(ContentTypes.`application/json`.toString()))
-    }
-    implicit def sprayJsonWriteable(implicit codec: Codec): Writeable[JsValue] = {
-        Writeable(message => codec.encode(message.toString()))
-    }
-
-    def singleRequestBlocking(
-        uri: String,
-        content: JsValue,
-        timeout: FiniteDuration,
-        retryOnTCPErrors: Boolean = false,
-        retryOn4xxErrors: Boolean = false,
-        retryOn5xxErrors: Boolean = false,
-        retryInterval: FiniteDuration = 100.milliseconds)
-        (implicit system: ActorSystem) : Try[WSResponse] = {
-
-        val f = singleRequest(
-            uri, content, timeout, retryOnTCPErrors, retryOn4xxErrors, retryOn5xxErrors,
retryInterval
-        )
-
-        // Duration.Inf is not an issue, since singleRequest has a built-in timeout mechanism.
-        Await.ready(f, Duration.Inf)
-
-        f.value.get
-    }
-
-    // Makes a request, expects a successful within timeout, retries on selected
-    // errors until timeout has passed.
-    def singleRequest(
-         uri: String,
-         content: JsValue,
-         timeout: FiniteDuration,
-         retryOnTCPErrors: Boolean = false,
-         retryOn4xxErrors: Boolean = false,
-         retryOn5xxErrors: Boolean = false,
-         retryInterval: FiniteDuration = 100.milliseconds)
-        (implicit system: ActorSystem) : Future[WSResponse] = {
-        implicit val executionContext = system.dispatcher
-        implicit val materializer = ActorMaterializer()
-        val wsClient = AhcWSClient()
-
-        val timeoutException = new TimeoutException(s"Request to ${uri} could not be completed
in time.")
-
-        val promise = Promise[WSResponse]
-
-        // Timeout includes all retries.
-        system.scheduler.scheduleOnce(timeout) {
-            promise.tryFailure(timeoutException)
-        }
-
-        def tryOnce() : Unit = if(!promise.isCompleted) {
-            val f = wsClient.url(uri).withRequestTimeout(timeout).post(content)
-            f.onSuccess {
-                case r if r.status >= 400 && r.status < 500 && retryOn4xxErrors
=>
-                    system.scheduler.scheduleOnce(retryInterval) { tryOnce() }
-                case r if r.status >= 500 && r.status < 600 && retryOn5xxErrors
=>
-                    system.scheduler.scheduleOnce(retryInterval) { tryOnce() }
-                case r =>
-                    wsClient.close()
-                    promise.trySuccess(r)
-            }
-
-            f.onFailure {
-                case s : java.net.ConnectException if retryOnTCPErrors =>
-                    // TCP error (e.g. connection couldn't be opened)
-                    system.scheduler.scheduleOnce(retryInterval) { tryOnce() }
-
-                case t : Throwable =>
-                    // Other error. We fail the promise.
-                    promise.tryFailure(t)
-            }
-        }
-
-        tryOnce()
-
-        promise.future
-    }
-}

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Mime
View raw message