Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D48AB200D06 for ; Mon, 25 Sep 2017 17:16:00 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D2B771609C4; Mon, 25 Sep 2017 15:16:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F22701609BB for ; Mon, 25 Sep 2017 17:15:59 +0200 (CEST) Received: (qmail 44399 invoked by uid 500); 25 Sep 2017 15:15:58 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 44389 invoked by uid 99); 25 Sep 2017 15:15:58 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Sep 2017 15:15:58 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 60D1A1A0EB6 for ; Mon, 25 Sep 2017 15:15:58 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.401 X-Spam-Level: X-Spam-Status: No, score=-0.401 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.8, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id svH0lB1iIhqS for ; Mon, 25 Sep 2017 15:15:56 +0000 (UTC) Received: from mail-ua0-f175.google.com (mail-ua0-f175.google.com [209.85.217.175]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 68A3E5FBDF for ; Mon, 25 Sep 2017 15:15:56 +0000 (UTC) Received: by mail-ua0-f175.google.com with SMTP id l94so4568861ual.11 for ; Mon, 25 Sep 2017 08:15:56 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=wWFLqdcwVkTO45tTeGKZBWFRULJ3aPnYLAYGTwhTo/o=; b=CY7fV2QPqWlqZbHLSiTQeFjUffrunAwGpF7WsNVBuw2Wucb3g3Omoe70M+FFfNphbJ aRPiVBYagsf+2/VA74ulHAFTCI4rNyeHujIUH/5xqd7kMcvkauuI5IoOkCjIlsmL6Kzr sYDQon7NUijSjh83D9EZGu74c3jOuatorPGxYWpx4omqg0TKV6Pfb0ttEm1VMkTcZhb6 4n0tn/hdQpzbXmnyIJzXyMMCn/oqUzYFb9ojFiedk9P0c3241r2FhGtEkaw1ONLE8wH/ EKbwOJ4TqPI/ClI6ySCPWAjk4ST6F/if0RuGBs/5YQCJMksNbh64mCP4WQq/5cZsrPWy 1Dgw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=wWFLqdcwVkTO45tTeGKZBWFRULJ3aPnYLAYGTwhTo/o=; b=MHp9tlnpyVoB9o3YTau+hFroYr4h/PSmi2r5pBZv4AikzyMl7TpYFhnBio2MTTriqp OlV6VemEPAJBB3uKV+9jU1VJ8I3vR6OC2NBZbqDjTvsIDCD3/CK3iPYcDBmeSpR7yw40 ztBsX9+xxNwEi5934GzQ/FMWq5Q3ddyTBY+RJqJkGZ+LOWmNYDD+FS8XPNqh/2/beaHl tzb1I1Gyxhk8tcabXYzjx/RIQuDd608bvvjnh5xeNLNWNU4J485GmWJtWUcm13WRra1V i4E5BwVgM21GOUag1slI7p78ErsK0UigJB188PNuVevfBcXGR7xL/rqrPDWnMf013xFb Psjg== X-Gm-Message-State: AHPjjUjd0RwN54sA32iemSIldYjU40Ta8bFS0iJ/M+h835snENOjoOsr mKrLCDfLfpypWk05Cz0qnbTlG3/otKZGxAEVmHY= X-Google-Smtp-Source: AOwi7QA4dNFJx/3Cg1XkJ0GS0FcbOUzOhK6EDSIRw3szfhLnM34ccWq/wJxAMoh2uTt7EGvNPOsgEZmmRpd/2g/9+Do= X-Received: by 10.176.23.132 with SMTP id r4mr6931321uaf.121.1506352555821; Mon, 25 Sep 2017 08:15:55 -0700 (PDT) MIME-Version: 1.0 Received: by 10.103.122.83 with HTTP; Mon, 25 Sep 2017 08:15:55 -0700 (PDT) In-Reply-To: References: <1654823.QftJd3NfD3@nico-work> From: "Federico D'Ambrosio" Date: Mon, 25 Sep 2017 17:15:55 +0200 Message-ID: Subject: Re: Using HiveBolt from storm-hive with Flink-Storm compatibility wrapper To: Timo Walther Cc: user Content-Type: multipart/alternative; boundary="f403045e69f8ff10e7055a0508d8" archived-at: Mon, 25 Sep 2017 15:16:01 -0000 --f403045e69f8ff10e7055a0508d8 Content-Type: text/plain; charset="UTF-8" Hi Timo, I didn't think about the jdbc-connector (I actually forgot that this was a thing) and I'll surely look into it. So far, I was trying to implement a simple sink (only for JSON so far) starting from the base provided by the storm HiveBolt implementation, my goal was to make use of Hive Streaming API. I noticed that there are some potentially blocking calls in Hive API, for example when a TransactionBatch is being committed or the StreamingConnection is being closed, in your opinion what would be the best way to deal with this kind of calls in Flink? Wrapping them in an AsyncFunction? Simply spawning a new thread? Kind regards, Federico 2017-09-25 16:43 GMT+02:00 Timo Walther : > Hi Federico, > > I think going through a Storm compatibility layer could work, but did you > thought about using the flink-jdbc connector? That should be the easiest > solution. > > Otherwise I think it would be easier to quickly implement your our > SinkFunction. It is just one method that you have to implement, you could > call some Hive commands there. > > Regards, > Timo > > > Am 9/25/17 um 4:16 PM schrieb Nico Kruber: > > Hi Federico, >> I also did not find any implementation of a hive sink, nor much details >> on this >> topic in general. Let me forward this to Timo and Fabian (cc'd) who may >> know >> more. >> >> Nico >> >> On Friday, 22 September 2017 12:14:32 CEST Federico D'Ambrosio wrote: >> >>> Hello everyone, >>> >>> I'd like to use the HiveBolt from storm-hive inside a flink job using the >>> Flink-Storm compatibility layer but I'm not sure how to integrate it. Let >>> me explain, I would have the following: >>> >>> val mapper = ... >>> >>> val hiveOptions = ... >>> >>> streamByID >>> .transform[OUT]("hive-sink", new BoltWrapper[IN, OUT](new >>> HiveBolt(hiveOptions))) >>> >>> where streamByID is a DataStream[Event]. >>> >>> What would be the IN and OUT types? HiveBolt executes on a storm Tuple, >>> so, >>> I'd think that In should be an Event "tuple-d" ( event => (field1, >>> field2, >>> field3 ...) ), while OUT, since I don't want the stream to keep flowing >>> would be null or None? >>> >>> Alternatively, do you know any implementation of an hive sink in Flink? >>> Other than the adaptation of the said HiveBolt in a RichSinkFunction? >>> >>> Thanks for your attention, >>> Federico >>> >> > > -- Federico D'Ambrosio --f403045e69f8ff10e7055a0508d8 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Timo,

I didn't t= hink about the jdbc-connector (I actually forgot that this was a thing) and= I'll surely look into it.
So far, I was trying to implement a simple sink (only for JSON so far) starting from the base provided by the = storm HiveBolt implementation, my goal was to make use of Hive Streaming AP= I.
I noticed that there are some potentially blocking calls = in Hive API, for example when a TransactionBatch is being committed or the = StreamingConnection is being closed, in your opinion what would be the best= way to deal with this kind of calls in Flink? Wrapping them in an AsyncFun= ction? Simply spawning a new thread?

Kind regards,=
Federico

2017-09-25 16:43 GMT+02:00 Timo Walther <twalthr@apache= .org>:
Hi Federico,

I think going through a Storm compatibility layer could work, but did you t= hought about using the flink-jdbc connector? That should be the easiest sol= ution.

Otherwise I think it would be easier to quickly implement your our SinkFunc= tion. It is just one method that you have to implement, you could call some= Hive commands there.

Regards,
Timo


Am 9/25/17 um 4:16 PM schrieb Nico Kruber:

Hi Federico,
I also did not find any implementation of a hive sink, nor much details on = this
topic in general. Let me forward this to Timo and Fabian (cc'd) who may= know
more.

Nico

On Friday, 22 September 2017 12:14:32 CEST Federico D'Ambrosio wrote:
Hello everyone,

I'd like to use the HiveBolt from storm-hive inside a flink job using t= he
Flink-Storm compatibility layer but I'm not sure how to integrate it. L= et
me explain, I would have the following:

val mapper =3D ...

val hiveOptions =3D ...

streamByID
=C2=A0 =C2=A0.transform[OUT]("hive-sink", new BoltWrapper[IN, OUT= ](new
HiveBolt(hiveOptions)))

where streamByID is a DataStream[Event].

What would be the IN and OUT types? HiveBolt executes on a storm Tuple, so,=
I'd think that In should be an Event "tuple-d" ( event =3D>= ; (field1, field2,
field3 ...) ), while OUT, since I don't want the stream to keep flowing=
would be null or None?

Alternatively, do you know any implementation of an hive sink in Flink?
Other than the adaptation of the said HiveBolt in a RichSinkFunction?

Thanks for your attention,
=C2=A0 Federico





--
Federico D'Ambr= osio
--f403045e69f8ff10e7055a0508d8--