openwhisk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From japet...@apache.org
Subject [incubator-openwhisk-package-kafka] branch master updated: Update Tests to Wait for Producer to Finish (#306)
Date Mon, 12 Nov 2018 20:51:22 GMT
This is an automated email from the ASF dual-hosted git repository.

japetrsn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 573d52b  Update Tests to Wait for Producer to Finish (#306)
573d52b is described below

commit 573d52bddd5339582ebeb140520b9fa2e2041587
Author: James Dubee <jwdubee@us.ibm.com>
AuthorDate: Mon Nov 12 15:51:18 2018 -0500

    Update Tests to Wait for Producer to Finish (#306)
    
    * Wait for producer to finish
    
    * Expect Exception from Producer for Oversized Payload
    
    * Add comment to test
    
    * Increase producer max.request.size
---
 .../test/scala/system/health/BasicHealthTest.scala    | 13 -------------
 .../scala/system/packages/MessageHubFeedTests.scala   |  5 +++--
 tests/src/test/scala/system/utils/KafkaUtils.scala    | 19 ++++++++++++++++---
 3 files changed, 19 insertions(+), 18 deletions(-)

diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 7b90679..ae8c788 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -17,11 +17,8 @@
 
 package system.health
 
-import java.util.concurrent.{TimeUnit, TimeoutException}
-
 import common.TestUtils.NOT_FOUND
 import common._
-import org.apache.kafka.clients.producer.ProducerRecord
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers}
@@ -101,16 +98,6 @@ class BasicHealthTest
 
       produceMessage(topic, key, verificationName)
 
-      try {
-        val result = future.get(60, TimeUnit.SECONDS)
-
-        println(s"Produced message to topic: ${result.topic()} on partition: ${result.partition()}
at offset: ${result.offset()} with timestamp: ${result.timestamp()}.")
-      } catch {
-        case e: TimeoutException =>
-          fail(s"TimeoutException received waiting for message to be produced to topic: $topic
with key: $key and value: $value. ${e.getMessage}")
-        case e: Exception => throw e
-      }
-
       // Check if the trigger, that should have been created as reaction on the kafka-message,
has been created.
       // The trigger should have been created by the action, that has been triggered by the
kafka message.
       // If we cannot find it, the most probably the action did not run.
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index d691125..941f846 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -38,6 +38,7 @@ import common.WskTestHelpers
 import ActionHelper._
 import common.TestUtils.NOT_FOUND
 import org.apache.openwhisk.utils.retry
+import java.util.concurrent.ExecutionException
 
 @RunWith(classOf[JUnitRunner])
 class MessageHubFeedTests
@@ -197,9 +198,9 @@ class MessageHubFeedTests
       val verificationName = s"trigger-$currentTime"
 
       wsk.trigger.get(verificationName, NOT_FOUND)
-      println("Producing an oversized message")
-      produceMessage(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize))
 
+      // The producer will generate an error as the payload size is too large for the MessageHub
brokers
+      a[ExecutionException] should be thrownBy produceMessage(topic, verificationName, generateMessage(s"${currentTime}",
testPayloadSize))
       a[Exception] should be thrownBy retry(wsk.trigger.get(verificationName), 60, Some(1.second))
   }
 
diff --git a/tests/src/test/scala/system/utils/KafkaUtils.scala b/tests/src/test/scala/system/utils/KafkaUtils.scala
index 62361a6..19345b3 100644
--- a/tests/src/test/scala/system/utils/KafkaUtils.scala
+++ b/tests/src/test/scala/system/utils/KafkaUtils.scala
@@ -19,6 +19,7 @@ package system.utils
 
 import java.util.HashMap
 import java.util.Properties
+import java.util.concurrent.{TimeUnit, TimeoutException}
 
 import com.jayway.restassured.RestAssured
 import com.jayway.restassured.config.{RestAssuredConfig, SSLConfig}
@@ -38,6 +39,7 @@ import common.TestHelpers
 import common.TestUtils
 import common.WskTestHelpers
 import org.apache.openwhisk.utils.retry
+import org.apache.kafka.clients.producer.ProducerRecord
 
 trait KafkaUtils extends TestHelpers with WskTestHelpers {
     lazy val messageHubProps = KafkaUtils.initializeMessageHub()
@@ -126,6 +128,16 @@ trait KafkaUtils extends TestHelpers with WskTestHelpers {
 
         producer.flush()
         producer.close()
+
+        try {
+          val result = future.get(60, TimeUnit.SECONDS)
+
+          println(s"Produced message to topic: ${result.topic()} on partition: ${result.partition()}
at offset: ${result.offset()} with timestamp: ${result.timestamp()}.")
+        } catch {
+          case e: TimeoutException =>
+            fail(s"TimeoutException received waiting for message to be produced to topic:
$topic with key: $key and value: $value. ${e.getMessage}")
+          case e: Exception => throw e
+        }
     }
 }
 
@@ -138,7 +150,8 @@ object KafkaUtils {
                                 "password",
                                 "key.serializer",
                                 "value.serializer",
-                                "security.protocol")
+                                "security.protocol",
+                                "max.request.size")
 
         val propertyMap = props.filterKeys(
             requiredKeys.contains(_)
@@ -176,7 +189,7 @@ object KafkaUtils {
         val security_protocol = ("security.protocol", "SASL_SSL");
         val keySerializer = ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         val valueSerializer = ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
+        val maxRequestSize = ("max.request.size", "3000000");
         var brokerList = new ListBuffer[String]()
         val jsonArray = credentials.get("kafka_brokers_sasl").getAsJsonArray()
         val brokerIterator = jsonArray.iterator()
@@ -190,7 +203,7 @@ object KafkaUtils {
         System.setProperty("java.security.auth.login.config", "")
         setMessageHubSecurityConfiguration(user._2, password._2)
 
-        Map(user, password, kafka_admin_url, api_key, brokers, security_protocol, keySerializer,
valueSerializer)
+        Map(user, password, kafka_admin_url, api_key, brokers, security_protocol, keySerializer,
valueSerializer, maxRequestSize)
     }
 
     private def setMessageHubSecurityConfiguration(user: String, password: String) = {


Mime
View raw message