Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CCB2D200C06 for ; Fri, 13 Jan 2017 00:19:34 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CB386160B4C; Thu, 12 Jan 2017 23:19:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CB15B160B40 for ; Fri, 13 Jan 2017 00:19:33 +0100 (CET) Received: (qmail 34334 invoked by uid 500); 12 Jan 2017 23:19:32 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 34324 invoked by uid 99); 12 Jan 2017 23:19:32 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Jan 2017 23:19:32 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 7348C1A0026 for ; Thu, 12 Jan 2017 23:19:32 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.999 X-Spam-Level: * X-Spam-Status: No, score=1.999 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_HELO_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=vgbio360.onmicrosoft.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id JIQWkfAE1GQt for ; Thu, 12 Jan 2017 23:19:28 +0000 (UTC) Received: from NAM01-BN3-obe.outbound.protection.outlook.com (mail-bn3nam01on0097.outbound.protection.outlook.com [104.47.33.97]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id D4A045F370 for ; Thu, 12 Jan 2017 23:19:27 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=vgbio360.onmicrosoft.com; s=selector1-physiq-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version; bh=yI4Sb+2aczlZKhBeZyCam9OYZ5Prj6sLUfJ511ZKWNc=; b=HoShNC40rz/sZIv6gOHKYLtK6gB6gVFcKxcNGmiwBnLU4byyxXTMM41CWO7NcvOcrqgK6OcGJGB3ZC4UpjZT/FuA4V/iHIYjX4Q+sl7F14TbkEL16upawoGm/MEFcP8HjeDnAFASmfkPPV51ATfHzwm8C4kXMA96WJimxpvU6RY= Received: from CY1PR02MB1995.namprd02.prod.outlook.com (10.166.189.153) by CY1PR02MB1994.namprd02.prod.outlook.com (10.166.189.152) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P384) id 15.1.829.7; Thu, 12 Jan 2017 23:19:05 +0000 Received: from CY1PR02MB1995.namprd02.prod.outlook.com ([10.166.189.153]) by CY1PR02MB1995.namprd02.prod.outlook.com ([10.166.189.153]) with mapi id 15.01.0829.017; Thu, 12 Jan 2017 23:19:05 +0000 From: Paul Joireman To: "user@flink.apache.org" Subject: Re: Getting key from keyed stream Thread-Topic: Getting key from keyed stream Thread-Index: AQHSbRT7a/nIzda/NkaAp2rYWgmFQ6E1dPWAgAAE4AE= Date: Thu, 12 Jan 2017 23:19:04 +0000 Message-ID: References: , In-Reply-To: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: authentication-results: spf=none (sender IP is ) smtp.mailfrom=paul.joireman@physiq.com; x-originating-ip: [96.27.106.194] x-ms-office365-filtering-correlation-id: a90b976e-a1e3-49c1-74bc-08d43b4166e2 x-microsoft-antispam: UriScan:;BCL:0;PCL:0;RULEID:(22001);SRVR:CY1PR02MB1994; x-microsoft-exchange-diagnostics: 1;CY1PR02MB1994;7:9wdwTwdUIeNS0A+YwxvAFs9ylcHAdTKdzBEQoFqFifi2Vu9S7WbDpG0SmPyTuEygn4sq3VeSVJLNMhCNMu4/Iq7QRn5eC0Y0IpAV97wQmcBOaFCHrH2mYD9FZzRLS+T7MrqnkTzk0TN1r0JT+3L/jtYhHDxFe9bjFZHQoQ3LdoWdrh8FkSNsMQjhRwtGbc/P4TKPKc7ktXR9sBDQmXsRq7cAPcSMpe1qvgkwIeV2qBu9BalZBDhrVxA0J2AriflQZ12lGfTq6+W8SgmZt9srdSkTgdt5x9e1OnVMqXAs3pnMi/scEKPaufU9Whh0gWfv6Biqed0KcXr1els55hqGkuhnOkrMbntxGGHButoOAoeV0liGtugT0M3ieFaCkYdNgMN0WEOQjxsZL+xI5Uz9qUJ9hdJ3bVrRoiXFDrMbtk4a7l+hJ7v/9QgLXdD5qhoZShlbInpXrOcuSLYr2jUesw== x-microsoft-antispam-prvs: x-exchange-antispam-report-test: UriScan:(278428928389397)(31418570063057); x-exchange-antispam-report-cfa-test: BCL:0;PCL:0;RULEID:(6040375)(601004)(2401047)(5005006)(8121501046)(10201501046)(3002001)(6041248)(2016111802025)(20161123555025)(20161123560025)(20161123564025)(20161123562025)(6072148)(6043046);SRVR:CY1PR02MB1994;BCL:0;PCL:0;RULEID:;SRVR:CY1PR02MB1994; x-forefront-prvs: 018577E36E x-forefront-antispam-report: SFV:NSPM;SFS:(10019020)(7916002)(39830400002)(39410400002)(39450400003)(53754006)(377454003)(189002)(199003)(24454002)(101416001)(50986999)(9686003)(97736004)(107886002)(92566002)(2501003)(2351001)(54896002)(106356001)(2900100001)(189998001)(55016002)(2950100002)(8936002)(6916009)(6306002)(99286003)(606005)(68736007)(106116001)(105586002)(25786008)(7696004)(5640700003)(33656002)(81166006)(8676002)(6506006)(1730700003)(110136003)(81156014)(5660300001)(77096006)(19627405001)(6436002)(450100001)(76176999)(74316002)(7906003)(102836003)(6116002)(3846002)(7736002)(3280700002)(66066001)(54356999)(229853002)(38730400001)(236005)(3660700001)(86362001)(122556002)(2906002);DIR:OUT;SFP:1102;SCL:1;SRVR:CY1PR02MB1994;H:CY1PR02MB1995.namprd02.prod.outlook.com;FPR:;SPF:None;PTR:InfoNoRecords;MX:1;A:1;LANG:en; received-spf: None (protection.outlook.com: physiq.com does not designate permitted sender hosts) spamdiagnosticoutput: 1:99 spamdiagnosticmetadata: NSPM Content-Type: multipart/alternative; boundary="_000_CY1PR02MB1995B6A5A7FF703F9BE3A21680790CY1PR02MB1995namp_" MIME-Version: 1.0 X-OriginatorOrg: physiq.com X-MS-Exchange-CrossTenant-originalarrivaltime: 12 Jan 2017 23:19:04.7940 (UTC) X-MS-Exchange-CrossTenant-fromentityheader: Hosted X-MS-Exchange-CrossTenant-id: ecbda8f3-3c3b-43e4-9663-6beab1acd10b X-MS-Exchange-Transport-CrossTenantHeadersStamped: CY1PR02MB1994 archived-at: Thu, 12 Jan 2017 23:19:35 -0000 --_000_CY1PR02MB1995B6A5A7FF703F9BE3A21680790CY1PR02MB1995namp_ Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable Thanks Jamie, Just figured that out after some digging and a little trial and error, that= works great. Paul ________________________________ From: Jamie Grier Sent: Thursday, January 12, 2017 4:59:43 PM To: user@flink.apache.org Subject: Re: Getting key from keyed stream A simpler and more efficient approach would simply be the following: val stream =3D env.addSource(new FlinkKafkaConsumer(...)) stream .addSink(new FlinkKafkaProducer(new MyKeyedSerializationSchema(...))) env.execute() In MyKeyedSerializationSchema just override the getTargetTopic() method. That should do it :) -Jamie On Thu, Jan 12, 2017 at 12:53 PM, Paul Joireman > wrote: Hi all, Is there a simple way to read the key from a KeyedStream. Very simply I'm= trying to read a message from Kafka, separate the incoming messages by a f= ield in the message and write the original message back to Kafka using that= field as a new topic. I chose to partition the incoming stream by creatin= g a KeyedStream and using the field from the message as the key. The onl= y thing left is to write the message to Kafka with a producer but i need to= know the topic to write to and for that I need to be able to read the key.= Is there a way to do this? Is there a better way to do this, rather than using a KeyedStream. Paul ? -- Jamie Grier data Artisans, Director of Applications Engineering @jamiegrier jamie@data-artisans.com --_000_CY1PR02MB1995B6A5A7FF703F9BE3A21680790CY1PR02MB1995namp_ Content-Type: text/html; charset="us-ascii" Content-Transfer-Encoding: quoted-printable

Thanks Jamie,


Just figured that out after some digging and a little trial and error, t= hat works great.  


Paul


From: Jamie Grier <jamie= @data-artisans.com>
Sent: Thursday, January 12, 2017 4:59:43 PM
To: user@flink.apache.org
Subject: Re: Getting key from keyed stream
 

A simpler and more efficient ap= proach would simply be the following:

val stream =3D env.addSou=
rce(new FlinkKafkaConsumer(...))

stream
  .addSink(new FlinkKafkaProducer(new MyKeyedSerializationSchema(...)))

env.execute()

In MyKeyedSerializationSchema just override the getTargetTopic() method.

That should do it :)

-Jamie

On Thu, Jan 12, 2017 at 12:53 P= M, Paul Joireman <paul.joireman@physiq.com> wrote:

Hi all,


Is there a simple way to read the key from a KeyedStream.   Very si= mply I'm trying to read a message from Kafka, separate the incoming message= s by a field in the message and write the original message back to Kafka us= ing that field as a new topic.  I chose to partition the incoming stream by creating a KeyedStream and using the f= ield from the message as the key.    The only thing left is to wr= ite the message to Kafka with a producer but i need to know the topic to wr= ite to and for that I need to be able to read the key.   Is there a way to do this?


Is there a better way to do this, rather than using a KeyedStream.


Paul

--

Jamie Grier
data Artisans, Director of Applications Engineering

--_000_CY1PR02MB1995B6A5A7FF703F9BE3A21680790CY1PR02MB1995namp_--