Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1755F200B2D for ; Thu, 16 Jun 2016 21:27:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 16059160A52; Thu, 16 Jun 2016 19:27:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5E443160A51 for ; Thu, 16 Jun 2016 21:27:22 +0200 (CEST) Received: (qmail 48668 invoked by uid 500); 16 Jun 2016 19:27:21 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 48652 invoked by uid 99); 16 Jun 2016 19:27:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jun 2016 19:27:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6974DE00DB; Thu, 16 Jun 2016 19:27:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dkulp@apache.org To: commits@camel.apache.org Date: Thu, 16 Jun 2016 19:27:22 -0000 Message-Id: In-Reply-To: <40e8b150d07046139daab1448e9c2d74@git.apache.org> References: <40e8b150d07046139daab1448e9c2d74@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] camel git commit: [CAMEL-10065] Add a test case archived-at: Thu, 16 Jun 2016 19:27:23 -0000 [CAMEL-10065] Add a test case Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c4236142 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c4236142 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c4236142 Branch: refs/heads/camel-2.17.x Commit: c4236142c8c4e8778343a73db9331e34569a69a5 Parents: b7eed97 Author: Daniel Kulp Authored: Thu Jun 16 13:23:37 2016 -0400 Committer: Daniel Kulp Committed: Thu Jun 16 15:19:38 2016 -0400 ---------------------------------------------------------------------- .../component/kafka/KafkaProducerFullTest.java | 29 ++++++++++++++++++++ 1 file changed, 29 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c4236142/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java index 0bb4740..d5b65fa 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java @@ -17,8 +17,10 @@ package org.apache.camel.component.kafka; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -125,6 +127,33 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived); } + + @Test + public void producedStringCollectionMessageIsReceivedByKafka() throws InterruptedException, IOException { + int messageInTopic = 10; + int messageInOtherTopic = 5; + + CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic); + + List msgs = new ArrayList(); + for (int x = 0; x < messageInTopic; x++) { + msgs.add("Message " + x); + } + + sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1"); + msgs = new ArrayList(); + for (int x = 0; x < messageInOtherTopic; x++) { + msgs.add("Other Message " + x); + } + sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER); + + createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch); + + boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS); + + assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived); + } + @Test public void producedBytesMessageIsReceivedByKafka() throws InterruptedException, IOException { int messageInTopic = 10;