From commits-return-5968-archive-asf-public=cust-asf.ponee.io@openwhisk.apache.org Thu Sep 27 22:18:35 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 90F39180675 for ; Thu, 27 Sep 2018 22:18:34 +0200 (CEST) Received: (qmail 51700 invoked by uid 500); 27 Sep 2018 20:18:33 -0000 Mailing-List: contact commits-help@openwhisk.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@openwhisk.apache.org Delivered-To: mailing list commits@openwhisk.apache.org Received: (qmail 51691 invoked by uid 99); 27 Sep 2018 20:18:33 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Sep 2018 20:18:33 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D473B82F35; Thu, 27 Sep 2018 20:18:32 +0000 (UTC) Date: Thu, 27 Sep 2018 20:18:33 +0000 To: "commits@openwhisk.apache.org" Subject: [incubator-openwhisk-package-kafka] 01/01: MessageHubFeedTests Resiliency Updates MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: dubeejw@apache.org In-Reply-To: <153807951277.20210.5588949253720092574@gitbox.apache.org> References: <153807951277.20210.5588949253720092574@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-openwhisk-package-kafka X-Git-Refname: refs/heads/feed-test-resiliency X-Git-Reftype: branch X-Git-Rev: 74d5fa68393984955aacc9ae0b387fbef577526f X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20180927201832.D473B82F35@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. dubeejw pushed a commit to branch feed-test-resiliency in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git commit 74d5fa68393984955aacc9ae0b387fbef577526f Author: dubeejw AuthorDate: Thu Sep 27 16:17:52 2018 -0400 MessageHubFeedTests Resiliency Updates --- .../dat/createTriggerActionsFromEncodedMessage.js | 11 ++ .../system/packages/MessageHubFeedTests.scala | 194 +++++---------------- 2 files changed, 56 insertions(+), 149 deletions(-) diff --git a/tests/dat/createTriggerActionsFromEncodedMessage.js b/tests/dat/createTriggerActionsFromEncodedMessage.js new file mode 100644 index 0000000..0c27eb8 --- /dev/null +++ b/tests/dat/createTriggerActionsFromEncodedMessage.js @@ -0,0 +1,11 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more contributor +// license agreements; and to You under the Apache License, Version 2.0. + +var openwhisk = require('openwhisk'); + +function main(params) { + console.log(JSON.stringify(params)); + var name = new Buffer(params.messages[0].value, 'base64').toString('ascii'); + var ow = openwhisk({ignore_certs: true}); + return ow.triggers.create({name: name}); +} diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala index f2e6180..d970d4b 100644 --- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala +++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala @@ -20,7 +20,6 @@ import system.utils.KafkaUtils import scala.concurrent.duration.DurationInt import scala.language.postfixOps - import org.junit.runner.RunWith import org.scalatest.BeforeAndAfterAll import org.scalatest.FlatSpec @@ -28,23 +27,17 @@ import org.scalatest.Matchers import org.scalatest.Inside import org.scalatest.junit.JUnitRunner import org.apache.kafka.clients.producer.ProducerRecord - import spray.json.DefaultJsonProtocol._ import spray.json._ - import common.JsHelpers -import common.TestUtils import common.TestHelpers import common.Wsk import common.WskActorSystem import common.WskProps import common.WskTestHelpers - import ActionHelper._ -import java.util.Base64 -import java.nio.charset.StandardCharsets - +import common.TestUtils.NOT_FOUND import whisk.utils.retry @RunWith(classOf[JUnitRunner]) @@ -75,8 +68,6 @@ class MessageHubFeedTests val wsk = new Wsk() val actionName = s"${messagingPackage}/${messageHubFeed}" - val defaultAction = Some(TestUtils.getTestActionFilename("hello.js")) - behavior of "Message Hub feed action" it should "reject invocation when topic argument is missing" in { @@ -126,78 +117,7 @@ class MessageHubFeedTests runActionWithExpectedResult(actionName, "dat/multipleValueTypes.json", expectedOutput, false) } - it should "fire a trigger when a binary message is posted to message hub" in withAssetCleaner(wskprops) { - val currentTime = s"${System.currentTimeMillis}" - - (wp, assetHelper) => - val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" - println(s"Creating trigger ${triggerName}") - - createTrigger(assetHelper, triggerName, parameters = Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "api_key" -> kafkaUtils.getAsJson("api_key"), - "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), - "topic" -> topic.toJson, - "isBinaryKey" -> true.toJson, - "isBinaryValue" -> true.toJson)) - - val defaultActionName = s"helloKafka-${currentTime}" - - assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) => - action.create(name, defaultAction) - } - assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) => - rule.create(name, trigger = triggerName, action = defaultActionName) - } - - // It takes a moment for the consumer to fully initialize. - println("Giving the consumer a moment to get ready") - Thread.sleep(consumerInitTime) - - // key to use for the produced message - val key = "TheKey" - val encodedCurrentTime = Base64.getEncoder.encodeToString(currentTime.getBytes(StandardCharsets.UTF_8)) - val encodedKey = Base64.getEncoder.encodeToString(key.getBytes(StandardCharsets.UTF_8)) - - withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), - "topic" -> topic.toJson, - "key" -> key.toJson, - "value" -> currentTime.toJson))) { - _.response.success shouldBe true - } - - retry({ - println("Polling for activations") - val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries) - assert(activations.nonEmpty) - - val matchingActivations = for { - id <- activations - activation = wsk.activation.waitForActivation(id) - if (activation.isRight && activation.right.get.fields.get("response").toString.contains(encodedCurrentTime)) - } yield activation.right.get - - assert(matchingActivations.nonEmpty) - - val activation = matchingActivations.head - activation.getFieldPath("response", "success") shouldBe Some(true.toJson) - - // assert that there exists a message in the activation which has the expected keys and values - val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = encodedCurrentTime) - assert(messages.length == 1) - - val message = messages.head - message.getFieldPath("topic") shouldBe Some(topic.toJson) - message.getFieldPath("key") shouldBe Some(encodedKey.toJson) - }, N = 3) - } - - it should "not fire a single trigger with an oversized payload" in withAssetCleaner(wskprops) { + it should "fire multiple triggers for two large payloads" in withAssetCleaner(wskprops) { // payload size should be under the payload limit, but greater than 50% of the limit val testPayloadSize = 600000 @@ -218,6 +138,7 @@ class MessageHubFeedTests "isBinaryKey" -> false.toJson, "isBinaryValue" -> false.toJson)) + val defaultAction = Some("dat/createTriggerActionsFromKey.js") val defaultActionName = s"helloKafka-${currentTime}" assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) => @@ -227,6 +148,16 @@ class MessageHubFeedTests rule.create(name, trigger = triggerName, action = defaultActionName) } + val verificationName1 = s"trigger1-$currentTime" + val verificationName2 = s"trigger2-$currentTime" + + assetHelper.withCleaner(wsk.trigger, verificationName1) { (trigger, name) => + trigger.get(name, NOT_FOUND) + } + assetHelper.withCleaner(wsk.trigger, verificationName2) { (trigger, name) => + trigger.get(name, NOT_FOUND) + } + // It takes a moment for the consumer to fully initialize. println("Giving the consumer a moment to get ready") Thread.sleep(consumerInitTime) @@ -235,27 +166,14 @@ class MessageHubFeedTests // This should ensure that the feed fires these as two separate triggers. println("Rapidly producing two large messages") val producer = kafkaUtils.createProducer() - val firstMessage = new ProducerRecord(topic, "key", generateMessage(s"first${currentTime}", testPayloadSize)) - val secondMessage = new ProducerRecord(topic, "key", generateMessage(s"second${currentTime}", testPayloadSize)) + val firstMessage = new ProducerRecord(topic, verificationName1, generateMessage(s"first${currentTime}", testPayloadSize)) + val secondMessage = new ProducerRecord(topic, verificationName2, generateMessage(s"second${currentTime}", testPayloadSize)) producer.send(firstMessage) producer.send(secondMessage) producer.close() - retry({ - // verify there are two trigger activations required to handle these messages - println("Polling for activations") - val activations = wsk.activation.pollFor(N = 2, Some(triggerName), retries = maxRetries) - - println("Verifying activation content") - val matchingActivations = for { - id <- activations - activation = wsk.activation.waitForActivation(id) - if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}") || - activation.right.get.fields.get("response").toString.contains(s"second${currentTime}"))) - } yield activation.right.get - - assert(matchingActivations.length == 2) - }, N = 3) + retry(wsk.trigger.get(verificationName1), 60, Some(1.second)) + retry(wsk.trigger.get(verificationName2), 60, Some(1.second)) } it should "not fire a trigger for a single oversized message" in withAssetCleaner(wskprops) { @@ -279,6 +197,7 @@ class MessageHubFeedTests "isBinaryKey" -> false.toJson, "isBinaryValue" -> false.toJson)) + val defaultAction = Some("dat/createTriggerActionsFromKey.js") val defaultActionName = s"helloKafka-${currentTime}" assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) => @@ -288,30 +207,21 @@ class MessageHubFeedTests rule.create(name, trigger = triggerName, action = defaultActionName) } + val verificationName = s"trigger-$currentTime" + + wsk.trigger.get(verificationName, NOT_FOUND) + // It takes a moment for the consumer to fully initialize. println("Giving the consumer a moment to get ready") Thread.sleep(consumerInitTime) println("Producing an oversized message") val producer = kafkaUtils.createProducer() - val bigMessage = new ProducerRecord(topic, "key", generateMessage(s"${currentTime}", testPayloadSize)) + val bigMessage = new ProducerRecord(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize)) producer.send(bigMessage) producer.close() - retry({ - // verify there are no activations that match - println("Polling for activations") - val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries) - - println("Verifying activation content") - val matchingActivations = for { - id <- activations - activation = wsk.activation.waitForActivation(id) - if (activation.isRight && (activation.right.get.fields.get("response").toString.contains(s"first${currentTime}"))) - } yield activation.right.get - - assert(matchingActivations.isEmpty) - }, N = 3) + a[Exception] should be thrownBy retry(wsk.trigger.get(verificationName), 60, Some(1.second)) } it should "reject trigger update without passing in any updatable parameters" in withAssetCleaner(wskprops) { @@ -485,15 +395,22 @@ class MessageHubFeedTests "topic" -> topic.toJson )) + val defaultAction1 = Some("dat/createTriggerActions.js") val defaultActionName = s"helloKafka-${currentTime}" assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) => - action.create(name, defaultAction) + action.create(name, defaultAction1) } assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) => rule.create(name, trigger = triggerName, action = defaultActionName) } + val verificationName1 = s"trigger1-$currentTime" + + assetHelper.withCleaner(wsk.trigger, verificationName1) { (trigger, name) => + trigger.get(name, NOT_FOUND) + } + println("Giving the consumer a moment to get ready") Thread.sleep(consumerInitTime) @@ -504,12 +421,12 @@ class MessageHubFeedTests "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), "topic" -> topic.toJson, "key" -> key.toJson, - "value" -> currentTime.toJson + "value" -> verificationName1.toJson ))) { _.response.success shouldBe true } - checkForActivations(1, triggerName, topic, key, currentTime) + retry(wsk.trigger.get(verificationName1), 60, Some(1.second)) println("Updating trigger") @@ -524,11 +441,18 @@ class MessageHubFeedTests _.response.success shouldBe true } + val verificationName2 = s"trigger2-$currentTime" + + assetHelper.withCleaner(wsk.trigger, verificationName2) { (trigger, name) => + trigger.get(name, NOT_FOUND) + } + + val defaultAction2 = Some("dat/createTriggerActionsFromEncodedMessage.js") + wsk.action.create(defaultActionName, defaultAction2, update = true) + println("Giving the consumer a moment to get ready") Thread.sleep(consumerInitTime) - val encodedCurrentTime = Base64.getEncoder.encodeToString(currentTime.getBytes(StandardCharsets.UTF_8)) - println("Producing a message") withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map( "user" -> kafkaUtils.getAsJson("user"), @@ -536,12 +460,12 @@ class MessageHubFeedTests "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), "topic" -> topic.toJson, "key" -> key.toJson, - "value" -> currentTime.toJson + "value" -> verificationName2.toJson ))) { _.response.success shouldBe true } - checkForActivations(2, triggerName, topic, key, encodedCurrentTime) + retry(wsk.trigger.get(verificationName2), 60, Some(1.second)) } def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = { @@ -557,34 +481,6 @@ class MessageHubFeedTests } } - def checkForActivations(numActivations: Int, triggerName: String, topic: String, key: String, value: String) = { - retry({ - println("Polling for activations") - val activations = wsk.activation.pollFor(N = numActivations, Some(triggerName), retries = maxRetries) - assert(activations.nonEmpty) - - println("Validating content of activation(s)") - val matchingActivations = for { - id <- activations - activation = wsk.activation.waitForActivation(id) - if (activation.isRight && activation.right.get.fields.get("response").toString.contains(value)) - } yield activation.right.get - - assert(matchingActivations.nonEmpty) - - val activation = matchingActivations.head - activation.getFieldPath("response", "success") shouldBe Some(true.toJson) - - // assert that there exists a message in the activation which has the expected keys and values - val messages = KafkaUtils.messagesInActivation(activation, field = "value", value = value) - assert(messages.length == 1) - - val message = messages.head - message.getFieldPath("topic") shouldBe Some(topic.toJson) - message.getFieldPath("key") shouldBe Some(key.toJson) - }, N = 3) - } - def generateMessage(prefix: String, size: Int): String = { val longString = Array.fill[String](size)("0").mkString("") s"${prefix}${longString}"