This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new aace138 Remove play dependence. (#2438)
aace138 is described below
commit aace138bb08c49b12ce64ba9dac175f3939c7cf5
Author: rodric rabbah <rodric@gmail.com>
AuthorDate: Fri Jul 14 03:43:28 2017 -0400
Remove play dependence. (#2438)
---
.../scala/whisk/core/entity/ActivationResult.scala | 18 +++-
.../scala/whisk/core/container/HttpUtils.scala | 27 ++++-
.../whisk/core/container/WhiskContainer.scala | 2 +-
.../containerpool/docker/DockerContainer.scala | 7 +-
tests/build.gradle | 1 -
.../scala/actionContainers/ActionContainer.scala | 23 +---
tests/src/test/scala/common/AkkaHttpUtils.scala | 116 ---------------------
7 files changed, 44 insertions(+), 150 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
index 6d78dad..aef644e 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
@@ -19,6 +19,7 @@ package whisk.core.entity
import scala.util.Try
+import spray.http.StatusCodes.OK
import spray.json._
import spray.json.DefaultJsonProtocol
import whisk.common.Logging
@@ -96,12 +97,15 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol
{
protected[core] case class ConnectionError(t: Throwable) extends ContainerConnectionError
protected[core] case class NoResponseReceived() extends ContainerConnectionError
protected[core] case class Timeout() extends ContainerConnectionError
+
/**
- * @param okStatus the container response was OK (HTTP 200 status code), anything else
is considered an error
+ * @param statusCode the container HTTP response code (e.g., 200 OK)
* @param entity the entity response as string
* @param truncated either None to indicate complete entity or Some(actual length, max
allowed)
*/
- protected[core] case class ContainerResponse(okStatus: Boolean, entity: String, truncated:
Option[(ByteSize, ByteSize)] = None) {
+ protected[core] case class ContainerResponse(statusCode: Int, entity: String, truncated:
Option[(ByteSize, ByteSize)]) {
+ /** true iff status code is OK (HTTP 200 status code), anything else is considered
an error. **/
+ val okStatus = statusCode == OK.intValue
val ok = okStatus && truncated.isEmpty
override def toString = {
val base = if (okStatus) "ok" else "not ok"
@@ -110,6 +114,12 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol
{
}
}
+ protected[core] object ContainerResponse {
+ def apply(okStatus: Boolean, entity: String, truncated: Option[(ByteSize, ByteSize)]
= None): ContainerResponse = {
+ ContainerResponse(if (okStatus) OK.intValue else 500, entity, truncated)
+ }
+ }
+
/**
* Interprets response from container after initialization. This method is only called
when the initialization failed.
*
@@ -149,14 +159,14 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol
{
*/
protected[core] def processRunResponseContent(response: Either[ContainerConnectionError,
ContainerResponse], logger: Logging): ActivationResponse = {
response match {
- case Right(ContainerResponse(okStatus, str, truncated)) => truncated match
{
+ case Right(res @ ContainerResponse(_, str, truncated)) => truncated match
{
case None =>
Try { str.parseJson.asJsObject } match {
case scala.util.Success(result @ JsObject(fields)) =>
// If the response is a JSON object container an error field,
accept it as the response error.
val errorOpt = fields.get(ERROR_FIELD)
- if (okStatus) {
+ if (res.okStatus) {
errorOpt map { error =>
applicationError(error)
} getOrElse {
diff --git a/core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala b/core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala
index e53ddb2..1530d69 100644
--- a/core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala
+++ b/core/invoker/src/main/scala/whisk/core/container/HttpUtils.scala
@@ -20,6 +20,7 @@ package whisk.core.container
import java.nio.charset.StandardCharsets
import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration.DurationInt
import scala.util.Failure
import scala.util.Success
import scala.util.Try
@@ -34,7 +35,8 @@ import org.apache.http.conn.HttpHostConnectException
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClientBuilder
-import spray.json.JsObject
+import spray.json._
+
import whisk.core.entity.ActivationResponse._
import whisk.core.entity.ByteSize
import whisk.core.entity.size.SizeLong
@@ -56,7 +58,7 @@ protected[core] class HttpUtils(
timeout: FiniteDuration,
maxResponse: ByteSize) {
- def close = Try(connection.close)
+ def close() = Try(connection.close())
/**
* Posts to hostname/endpoint the given JSON object.
@@ -66,11 +68,11 @@ protected[core] class HttpUtils(
* wait longer than the total timeout (within a small slack allowance).
*
* @param endpoint the path the api call relative to hostname
- * @param body the json object to post
+ * @param body the JSON value to post (this is usually a JSON objecT)
* @param retry whether or not to retry on connection failure
* @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
*/
- def post(endpoint: String, body: JsObject, retry: Boolean): Either[ContainerConnectionError,
ContainerResponse] = {
+ def post(endpoint: String, body: JsValue, retry: Boolean): Either[ContainerConnectionError,
ContainerResponse] = {
val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8)
entity.setContentType("application/json")
@@ -92,7 +94,7 @@ protected[core] class HttpUtils(
val bytes = IOUtils.toByteArray(entity.getContent, bytesToRead)
val str = new String(bytes, StandardCharsets.UTF_8)
val truncated = if (contentLength <= maxResponseBytes) None else Some(contentLength.B,
maxResponse)
- Right(ContainerResponse(statusCode == 200, str, truncated))
+ Right(ContainerResponse(statusCode, str, truncated))
} else {
Left(NoResponseReceived())
}
@@ -135,3 +137,18 @@ protected[core] class HttpUtils(
.useSystemProperties()
.build
}
+
+object HttpUtils {
+ /** A helper method to post one single request to a connection. Used for container tests.
*/
+ def post(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject])
= {
+ val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB)
+ val response = connection.post(endPoint, content, retry = true)
+ connection.close()
+ response match {
+ case Right(r) => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption)
+ case Left(Timeout()) => throw new java.util.concurrent.TimeoutException()
+ case Left(ConnectionError(t: java.net.SocketTimeoutException)) => throw new
java.util.concurrent.TimeoutException()
+ case _ => throw new IllegalStateException()
+ }
+ }
+}
diff --git a/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala b/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala
index 4685693..49a4639 100644
--- a/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala
@@ -120,7 +120,7 @@ class WhiskContainer(
* Tear down the container and retrieve the logs.
*/
def teardown()(implicit transid: TransactionId): String = {
- connection.foreach(_.close)
+ connection.foreach(_.close())
getContainerLogs(containerName).toOption.getOrElse("none")
}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 4310f91..869c227 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -76,7 +76,7 @@ object DockerContainer {
case (key, value) => Seq("-e", s"$key=$value")
}.flatten
- val dnsArgs = dnsServers.map(Seq("--dns",_)).flatten
+ val dnsArgs = dnsServers.map(Seq("--dns", _)).flatten
val args = Seq(
"--cap-drop", "NET_RAW",
@@ -137,7 +137,10 @@ class DockerContainer(id: ContainerId, ip: ContainerIp)(
def suspend()(implicit transid: TransactionId): Future[Unit] = runc.pause(id)
def resume()(implicit transid: TransactionId): Future[Unit] = runc.resume(id)
- def destroy()(implicit transid: TransactionId): Future[Unit] = docker.rm(id)
+ def destroy()(implicit transid: TransactionId): Future[Unit] = {
+ httpConnection.foreach(_.close())
+ docker.rm(id)
+ }
def initialize(initializer: JsObject, timeout: FiniteDuration)(implicit transid: TransactionId):
Future[Interval] = {
val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_INIT, s"sending
initialization to $id $ip")
diff --git a/tests/build.gradle b/tests/build.gradle
index 3a07e70..b33e2c6 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -52,7 +52,6 @@ dependencies {
compile 'com.typesafe.akka:akka-testkit_2.11:2.4.16'
compile 'com.google.code.gson:gson:2.3.1'
compile 'org.scalamock:scalamock-scalatest-support_2.11:3.4.2'
- compile 'com.typesafe.play:play-ws_2.11:2.5.11'
compile project(':common:scala')
compile project(':core:controller')
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala
index 978114d..e10c792 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -31,7 +31,6 @@ import scala.language.postfixOps
import scala.sys.process.ProcessLogger
import scala.sys.process.stringToProcess
import scala.util.Random
-import scala.util.Try
import org.apache.commons.lang3.StringUtils
import org.scalatest.FlatSpec
@@ -159,26 +158,8 @@ object ActionContainer {
}
}
- private def syncPost(host: String, port: Int, endPoint: String, content: JsValue)(
- implicit actorSystem: ActorSystem): (Int, Option[JsObject]) = {
- import akka.http.scaladsl.model._
- import akka.http.scaladsl.unmarshalling._
- import akka.stream.ActorMaterializer
- import common.AkkaHttpUtils
-
- implicit val materializer = ActorMaterializer()
-
- val uri = Uri(
- scheme = "http",
- authority = Uri.Authority(host = Uri.Host(host), port = port),
- path = Uri.Path(endPoint))
-
- val f = for (
- response <- AkkaHttpUtils.singleRequest(uri.toString(), content, 90.seconds,
retryOnTCPErrors = true);
- responseBody <- Unmarshal(response.body).to[String]
- ) yield (response.status.intValue, Try(responseBody.parseJson.asJsObject).toOption)
-
- Await.result(f, 90.seconds)
+ private def syncPost(host: String, port: Int, endPoint: String, content: JsValue): (Int,
Option[JsObject]) = {
+ whisk.core.container.HttpUtils.post(host, port, endPoint, content)
}
private class ActionContainerImpl() extends ActionContainer {
diff --git a/tests/src/test/scala/common/AkkaHttpUtils.scala b/tests/src/test/scala/common/AkkaHttpUtils.scala
deleted file mode 100644
index 46faa69..0000000
--- a/tests/src/test/scala/common/AkkaHttpUtils.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package common
-
-import java.util.concurrent.TimeoutException
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.model._
-import akka.stream.ActorMaterializer
-import play.api.http.{ContentTypeOf, Writeable}
-import play.api.libs.ws.WSResponse
-import play.api.libs.ws.ahc.AhcWSClient
-import play.api.mvc.Codec
-import spray.json.JsValue
-
-import scala.concurrent.{Await, Future, Promise}
-import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
-import scala.util.Try
-
-object AkkaHttpUtils {
-
- // Writable for spray-json is required
- implicit def sprayJsonContentType(implicit codec: Codec): ContentTypeOf[JsValue] = {
- ContentTypeOf[JsValue](Some(ContentTypes.`application/json`.toString()))
- }
- implicit def sprayJsonWriteable(implicit codec: Codec): Writeable[JsValue] = {
- Writeable(message => codec.encode(message.toString()))
- }
-
- def singleRequestBlocking(
- uri: String,
- content: JsValue,
- timeout: FiniteDuration,
- retryOnTCPErrors: Boolean = false,
- retryOn4xxErrors: Boolean = false,
- retryOn5xxErrors: Boolean = false,
- retryInterval: FiniteDuration = 100.milliseconds)
- (implicit system: ActorSystem) : Try[WSResponse] = {
-
- val f = singleRequest(
- uri, content, timeout, retryOnTCPErrors, retryOn4xxErrors, retryOn5xxErrors,
retryInterval
- )
-
- // Duration.Inf is not an issue, since singleRequest has a built-in timeout mechanism.
- Await.ready(f, Duration.Inf)
-
- f.value.get
- }
-
- // Makes a request, expects a successful within timeout, retries on selected
- // errors until timeout has passed.
- def singleRequest(
- uri: String,
- content: JsValue,
- timeout: FiniteDuration,
- retryOnTCPErrors: Boolean = false,
- retryOn4xxErrors: Boolean = false,
- retryOn5xxErrors: Boolean = false,
- retryInterval: FiniteDuration = 100.milliseconds)
- (implicit system: ActorSystem) : Future[WSResponse] = {
- implicit val executionContext = system.dispatcher
- implicit val materializer = ActorMaterializer()
- val wsClient = AhcWSClient()
-
- val timeoutException = new TimeoutException(s"Request to ${uri} could not be completed
in time.")
-
- val promise = Promise[WSResponse]
-
- // Timeout includes all retries.
- system.scheduler.scheduleOnce(timeout) {
- promise.tryFailure(timeoutException)
- }
-
- def tryOnce() : Unit = if(!promise.isCompleted) {
- val f = wsClient.url(uri).withRequestTimeout(timeout).post(content)
- f.onSuccess {
- case r if r.status >= 400 && r.status < 500 && retryOn4xxErrors
=>
- system.scheduler.scheduleOnce(retryInterval) { tryOnce() }
- case r if r.status >= 500 && r.status < 600 && retryOn5xxErrors
=>
- system.scheduler.scheduleOnce(retryInterval) { tryOnce() }
- case r =>
- wsClient.close()
- promise.trySuccess(r)
- }
-
- f.onFailure {
- case s : java.net.ConnectException if retryOnTCPErrors =>
- // TCP error (e.g. connection couldn't be opened)
- system.scheduler.scheduleOnce(retryInterval) { tryOnce() }
-
- case t : Throwable =>
- // Other error. We fail the promise.
- promise.tryFailure(t)
- }
- }
-
- tryOnce()
-
- promise.future
- }
-}
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].
|