Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 73C99200BF2 for ; Mon, 2 Jan 2017 20:06:42 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 727E4160B22; Mon, 2 Jan 2017 19:06:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BB05F160B16 for ; Mon, 2 Jan 2017 20:06:41 +0100 (CET) Received: (qmail 44044 invoked by uid 500); 2 Jan 2017 19:06:40 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 44028 invoked by uid 99); 2 Jan 2017 19:06:40 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Jan 2017 19:06:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 113D9C0096 for ; Mon, 2 Jan 2017 19:06:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.248 X-Spam-Level: *** X-Spam-Status: No, score=3.248 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, KAM_INFOUSMEBIZ=0.75, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id abdgFnqAok1g for ; Mon, 2 Jan 2017 19:06:37 +0000 (UTC) Received: from mail-io0-f172.google.com (mail-io0-f172.google.com [209.85.223.172]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id C39D95F1B3 for ; Mon, 2 Jan 2017 19:06:36 +0000 (UTC) Received: by mail-io0-f172.google.com with SMTP id n85so183228116ioi.2 for ; Mon, 02 Jan 2017 11:06:36 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=v0bqQkMuTJDYz4VbBO04peO1G0PxwNCym+h//0yYtCg=; b=w3vkwLe6xH0uW/eMOBPNiXkqXBTnMFRUg6J5cyblsZ0lufAQUcOYlA9pUO1xLZRFF6 vSWoZSDRxgY+bupVPPNy6s9rOqWFwCNglf3Hrmyzivf7rAe2lBjer0xHv7Kx3NTDXLZL l+dj73DUNlpjZoqFasLmOGH2/sG6sGaTQ/HtNDjtuj84mfLd96GcadvM1iQWmjAppDhv kBkpxNyFYCk5jNOyNvWjLJxcuaC3qUHps2c1HTqG4iRPbM0/EBWEyBp8HgQPhKFXg/zf Ulh8w8LSkGDUk1eGSFbkDppXZ27Zkk4ByLPup1z8EaQhYOZnm8tQb8OKn0WaKtifeYJg DUEg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=v0bqQkMuTJDYz4VbBO04peO1G0PxwNCym+h//0yYtCg=; b=QdhWdLvWsrNB+3wu+99mtLBHsUtdLFxRU/QdisAHrynVYSOZdpRCt0s0YgZEGqio4b hiH6NicvfjWX0gCUHP5qBZn4+T3+3PD/ZaoThOKGtPLyxP/r2hQ/onp2KsNAocjgJq4/ mWcZ4qhOR5jryg4OXs7J67JRQNiYJ5Y8zJEKGFq4rVG+KseCQ6r9xWabGXCv+cpAE1OL NOfwZBZtElTFzLPP6lVAnEzmHM9le1JBsyFoz+XvWt++HXUGLxbiCCyP9GvAgBd8gEO9 EhxPmLibBCRg1lMJlwTiwddvVDTSmwZlxwQbefwNdKi+CzvXeWAnQ82PnH2phwKxr2e8 RIqA== X-Gm-Message-State: AIkVDXLDkOVqV98fwEzM3DWCBwBCzZEQVEchHnf2/E8Gz0jBwyVDTRAUFoP5MQwDigccD6gavDonKOSSsuwWog+3 X-Received: by 10.107.43.16 with SMTP id r16mr42225670ior.26.1483383993612; Mon, 02 Jan 2017 11:06:33 -0800 (PST) MIME-Version: 1.0 Received: by 10.36.16.213 with HTTP; Mon, 2 Jan 2017 11:06:33 -0800 (PST) In-Reply-To: References: From: Ewen Cheslack-Postava Date: Mon, 2 Jan 2017 11:06:33 -0800 Message-ID: Subject: Re: Multiple Consumer Group In Single Topic To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary=001a113a687801423a054521402e archived-at: Mon, 02 Jan 2017 19:06:42 -0000 --001a113a687801423a054521402e Content-Type: text/plain; charset=UTF-8 Rico, Every consumer group will see all messages for the topics they are subscribed to. If you want to filter these messages by consumer group, you'd need to perform that filtering yourself after the messages are returned by the consumer. To do so, you'd need to include enough information in the messages to perform this filtering (e.g. you might include metadata indicating which consumer group the message is intended for, although if you are doing that it might be better to simply use different topics for the two sets of messages). -Ewen On Fri, Dec 16, 2016 at 12:28 AM, Rico Lugod wrote: > Hi Guys, > > Good day! > > I have question regarding how to consume a specific message belongs to that > consumer group? > > Here's the scenario: > > Publish message "A" to topic "X" > Consume by Consumer Group A the message "A" from topic "X" > > Publish message "B" to topic "X" > Consume by Consumer Group B the message "B" from topic "X" > > How can achieve this in scala? Because both consumers will consume the > message even if it assigned a groupId. > > Here is my producer code: > > def main(args: Array[String]): Unit = { > val zookeeperUrl: String = "localhost:2182" > val kafkaServerUrl: String = "localhost:9092,localhost:9093" > val topic: String = "Topic_X" > val groupId: String = "Consumer_Group_A" > val eventType: String = "delete" > val deleteRetention: Int = 5240000 > val producerConfig = > ProducerService.createProducerConfig(zookeeperUrl, kafkaServerUrl, > topic, groupId, deleteRetention.toString) > val producer = new KafkaProducer[String, String](producerConfig) > val producerPayload = ProducerPayload("{\"batch_id\":\"" + > UUID.randomUUID().toString() > + "\", \"document_id\":\"" + UUID.randomUUID().toString() > + "\", \"type\":\"" + eventType > + "\"}", "", topic, groupId, deleteRetention) > ProducerService.sendMessage(producer, zookeeperUrl, kafkaServerUrl, > producerPayload) > logger.info("Done.") > } > > > Here is my consumer code: > > def consumeMessage = Action { implicit rs => > val zookeeperUrl: String = "localhost:2182" > val kafkaServerUrl: String = "localhost:9092,localhost:9093" > val topic: String = "Topic_X" > val groupId: String = "Consumer_Group_A" > val config = ConsumerService.createConsumerConfig(zookeeperUrl, > kafkaServerUrl, groupId) > val consumer = kafka.consumer.Consumer.create(config) > val consumerMap = consumer.createMessageStreams(Map(topic -> 1)) > val streams = consumerMap.get(topic).get > val it = streams(0).iterator() > while (it.hasNext()) { > val msg = new String(it.next().message()) > logger.info(s"Message successfully consumed from topic ${topic} => " > + msg) > } > consumer.shutdown() > logger.info("Done.") > Ok > } > > > > > Your help is much appreciated. Thank you*Sincerely yours,* > > > *Rico Nodalo Lugod* > Senior Java / J2EE / SOA - Developer > Cebu City, Philippines 6000 > > Email: rnl2004@gmail.com > --001a113a687801423a054521402e--