Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 83EA5200BA8 for ; Mon, 24 Oct 2016 08:45:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 826C9160AEB; Mon, 24 Oct 2016 06:45:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A3B28160AE1 for ; Mon, 24 Oct 2016 08:45:39 +0200 (CEST) Received: (qmail 88569 invoked by uid 500); 24 Oct 2016 06:45:38 -0000 Mailing-List: contact user-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@ignite.apache.org Delivered-To: mailing list user@ignite.apache.org Received: (qmail 88557 invoked by uid 99); 24 Oct 2016 06:45:38 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Oct 2016 06:45:38 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 37BB0180A5A for ; Mon, 24 Oct 2016 06:45:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id tsVBBw5W8g19 for ; Mon, 24 Oct 2016 06:45:37 +0000 (UTC) Received: from mail-yw0-f178.google.com (mail-yw0-f178.google.com [209.85.161.178]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 0EB4C5F1F5 for ; Mon, 24 Oct 2016 06:45:37 +0000 (UTC) Received: by mail-yw0-f178.google.com with SMTP id t192so173615806ywf.0 for ; Sun, 23 Oct 2016 23:45:37 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:from:date:message-id:subject:to; bh=+fyLnEJmBpk+tccnBucAzvDOjLZvpgsjUZxb7l+2gWY=; b=l47ARik23EvCiZ2IjzZkNLnkoY4oMZB4NxYBbnVv38V4W7SJLDdE3UXpihe0FGYYGs B8yWb1aWgjKCN/JmlWy1ovz9cJ76A2qrSXwil+6VhvuDCL09ND4uz2F1D3jENn0WtaFq mRwLSS9aIB6qTMbRQR9ElxYbWjWhPqWFj4UN7xwX+Cr6OKqdOLuf9VsguTvz6ROgHe7Q RzReA4o5ptiDVC+ySAM43+73VvHMmWF3AOPk8Rgx4DZv9ux30shULWtTn+Zm64fV+JUN a3t9px+Q56QiW5QQr5956TxFYegHnM75XyFo7H7+NYHEWkyWNKKQbe9fWLbW0K3WiE3/ Q0rA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=+fyLnEJmBpk+tccnBucAzvDOjLZvpgsjUZxb7l+2gWY=; b=KdX4G02UJHh7gRgSW8c6TYvTZhGMnHBZApcdbkGTlM4mXAqjdErTns/NDC2j54j1oV /Dhg5EVd/WoOTyxuRHI/JlYDbcBFcuggZbrRIvRBi4nr5D/YhIhtmxNZi08dhYxcDj99 iLLaaau0P+D7AZ0HZxpEMpUxIzfm6fGiKdyJNibW6m8U7UFoQqWtyZsgquM7Hun6ACQy D3Hv1Kv9NyJOMkhyQ+rTF58AT1qcQJjEDp95hLw0L5pbWi6WwRlpPwkakU9rwx24YPSI wcGfq52VCFbqh5oA1ZUtzSR3t+z9Rbm0bR7h92iypdTal+8lcgjE31KG9fkiod8gn4Bj m9Sw== X-Gm-Message-State: ABUngvcpDJrhqW1199HnZ6uoxCNz9mMzKV0VRHEBLZ6FHnhuR5OxjDh65GDhdeogo3dqucX3+iAKsV47WeIH7w== X-Received: by 10.107.135.71 with SMTP id j68mr10994274iod.116.1477291533400; Sun, 23 Oct 2016 23:45:33 -0700 (PDT) MIME-Version: 1.0 Received: by 10.107.55.198 with HTTP; Sun, 23 Oct 2016 23:44:53 -0700 (PDT) From: Anil Date: Mon, 24 Oct 2016 12:14:53 +0530 Message-ID: Subject: Kafka Streamer To: user@ignite.apache.org Content-Type: multipart/alternative; boundary=001a113fbbbc14032e053f96bd31 archived-at: Mon, 24 Oct 2016 06:45:40 -0000 --001a113fbbbc14032e053f96bd31 Content-Type: text/plain; charset=UTF-8 Hi, I am playing with kafka streamer for my use case and noticed that message has to value of the ignite cache. getStreamer().addData(msg.key(), msg.message()); ( https://github.com/apache/ignite/blob/master/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java ) i tried with stream receiver to covert incoming kafka message to number of cache entries and did not help. Seems like stream receiver is not pre-process of cache entry. Correct ? To allow client to add its own way of processing, Kafka streamer must provide a way to transform kafka message into cache entries. what do you say ? code would be like below (just giving a try with sudo code) kafkaStreamer.registerTransformer() if (null !=transformer){ getStreamer().addData(transformer.transform(msg.message())); }else { getStreamer().addData(msg.key(), msg.message()); } Thanks for your help. --001a113fbbbc14032e053f96bd31 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,

I am playing with kafka streamer fo= r my use case and noticed that message has to value of the ignite cache.

getStreamer().= addData(ms= g.key(), msg.message());


i tried with strea= m receiver to covert incoming kafka message to number of cache entries and = did not help. Seems like stream receiver is not pre-process of cache entry.= Correct ?

To allow client to add its own way = of processing, Kafka streamer must provide a way to transform kafka message= into cache entries. what do you say ?

code would = be like below (just giving a try with sudo code)

k= afkaStreamer.registerTransformer(<some transformer>)

if (null !=3Dtransformer){
=C2=A0 =C2=A0=C2=A0getStreamer()= .addData(transformer.transform(msg.message()));
}else {
<= span style=3D"color:rgb(51,51,51);font-family:consolas,"liberation mon= o",menlo,courier,monospace;font-size:12px;white-space:pre"> getStreame= r().addData(msg.key(), msg.message()= );
}

Thanks for your help.


--001a113fbbbc14032e053f96bd31--