Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 234E0200D2F for ; Wed, 1 Nov 2017 08:57:58 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 21C8B160BEA; Wed, 1 Nov 2017 07:57:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3F767160BE6 for ; Wed, 1 Nov 2017 08:57:57 +0100 (CET) Received: (qmail 6258 invoked by uid 500); 1 Nov 2017 07:57:56 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 6248 invoked by uid 99); 1 Nov 2017 07:57:55 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Nov 2017 07:57:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 045BF1A0018 for ; Wed, 1 Nov 2017 07:57:55 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.379 X-Spam-Level: ** X-Spam-Status: No, score=2.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 3ZyW5BupE0vg for ; Wed, 1 Nov 2017 07:57:53 +0000 (UTC) Received: from mail-vk0-f42.google.com (mail-vk0-f42.google.com [209.85.213.42]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 4BCFF60E3E for ; Wed, 1 Nov 2017 07:57:53 +0000 (UTC) Received: by mail-vk0-f42.google.com with SMTP id g69so908647vke.5 for ; Wed, 01 Nov 2017 00:57:53 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=zHhIVIbuLNgA3jav7T9ZaRmRw8Zo0ZqiIeRs/eV1JUU=; b=XXYModBCHkpm7Py11bA/sRi3+5IvqXvYpcsiDkFmWRsNipIOMUX9D9EGdefJkuntwu qjcvBRpHqtdpWVXaUTNjxq5zfFaG9ibiqzJnFX6CH5psqBP7xzq6uNqkhWgLUbsv3D4c kmgTrTbVLPKaqERUds3IrBa4ZUnmIj0rm/v4CH1nWygUlwNuDAAqgUIr+v1TL9zL8gmH 8fARinIL4rVuNgYknoUsPazJkYzblk+C84uvThc8I6ijYmIc8oLJ+9d/JjdJlG3XWBQ0 rBmYviJCrPGdm/PMacWMIBcgxQ8hrSSrbnrQHps1w1p+3YIrIIlq2wpJQl69QiS1yr48 6V/g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=zHhIVIbuLNgA3jav7T9ZaRmRw8Zo0ZqiIeRs/eV1JUU=; b=FxpProFFvbJLxJfhaq+RqDoV0C8ufSreIcQo91oIcdvrAwuFrakhSCudAmHOsrgxD+ +cu7ZZmJNyqQXxFiWrkANssFGnoGQjjz8DqdF42uC/hIC8DCVDKVDqWtKcF46jgFkTa4 ImIXKysuoOKdcWIg5Zr9/1qmYmSFALIMY2GPeVMTTLHUw2tn2KA+hRBGRHKTdyfVYT4r ga7CtPXACKu1jSSdK/QaRk9BqPZbKZsxegtQCP9tr+3y7NdlZqU9Xj9Bz7vGE9dJcJnd enHCF300sYtVfFj+qHV9ihFbTYbOurIcepSlCM9cdKWYkqUpgvjSWe1GZV3fDGVHg+Gq ZMwQ== X-Gm-Message-State: AMCzsaXuo64sKLwVQvFI0AJAnk+NhtPz0kBs1SJAVnaKbJJypEU1WDX5 tJCtUPP+LXWG66FJKk+Nkjb0pSTS7Xqv3ORS+bk= X-Google-Smtp-Source: ABhQp+Ss3oy90x8PUg0rc9RdNoKGPh5PlLNX7porjD8gEX4uQ982vVi/GX60nzWNilxiX+jTSxaoJp1N31oju0PEQqI= X-Received: by 10.31.69.209 with SMTP id s200mr3391352vka.188.1509523072234; Wed, 01 Nov 2017 00:57:52 -0700 (PDT) MIME-Version: 1.0 Received: by 10.103.142.129 with HTTP; Wed, 1 Nov 2017 00:57:11 -0700 (PDT) In-Reply-To: References: From: Fabian Hueske Date: Wed, 1 Nov 2017 08:57:11 +0100 Message-ID: Subject: Re: Batch job per stream message? To: Tomas Mazukna Cc: user Content-Type: multipart/alternative; boundary="001a114dba6c803efb055ce73a5d" archived-at: Wed, 01 Nov 2017 07:57:58 -0000 --001a114dba6c803efb055ce73a5d Content-Type: text/plain; charset="UTF-8" Hi Tomas, triggering a batch DataSet job from a DataStream program for each input record doesn't sound like a good idea to me. You would have to make sure that the cluster always has sufficient resources and handle failures. It would be preferable to have all data processing in a DataStream job. You mentioned that the challenge is to join the data of the files with a JDBC database. I see two ways to do that in a DataStream program: - replicate the JDBC table in a stateful operator. This means that you have to publish updates on the database to the Flink program. - query the JDBC table with an AsyncFunction. This operator concurrently executes multiple calls to an external service which improves latency and throughput. The operator ensures that checkpoints and watermarks are correctly handled. Best, Fabian 2017-10-30 19:11 GMT+01:00 Tomas Mazukna : > Trying to figure out the best design in Flink. > Reading from a kafka topic which has messages with pointers to files to be > processed. > I am thinking to somehow kick off a batch job per file... unless there is > an easy way to get a separate dataset per file. > I can do almost all of this in the stream, parse file with flat map -> > explode its contents into multiple data elements -> map, etc... > On of these steps would be to grab another dataset from JDBC source and > join with the stream's contents... > I think I am mixing the two concepts here and the right approach would be > to kick of this mini batch job per file, > where I have file datase t+ jdbc dataset to join with. > > So how would I go about kicking a batch from from streaming job? > > Thanks, > Tomas > --001a114dba6c803efb055ce73a5d Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Tomas,

triggering a batch DataSe= t job from a DataStream program for each input record doesn't sound lik= e a good idea to me.
You would have to make sure that the cl= uster always has sufficient resources and handle failures.

It would be preferable to have all data processing in a DataStream= job. You mentioned that the challenge is to join the data of the files wit= h a JDBC database.
I see two ways to do that in a DataStream prog= ram:
- replicate the JDBC table in a stateful operator. This mean= s that you have to publish updates on the database to the Flink program.
- query the JDBC table with an AsyncFunction. This operator concurr= ently executes multiple calls to an external service which improves latency= and throughput. The operator ensures that checkpoints and watermarks are c= orrectly handled.

Best, Fabian

2017-10-30 19:11 GMT+= 01:00 Tomas Mazukna <tomas.mazukna@gmail.com>:
Trying to figure out the best d= esign in Flink.
Reading from a kafka topic which has messages with poin= ters to files to be processed.
I am thinking to somehow kick off = a batch job per file... unless there is an easy way to get a separate datas= et per file.
I can do almost all of this in the stream, parse fil= e with flat map -> explode its contents into multiple data elements ->= ; map, etc...
On of these steps would be to grab another dataset = from JDBC source and join with the stream's contents...
I thi= nk I am mixing the two concepts here and the right approach would be to kic= k of this mini batch job per file,=C2=A0
where I have file datase= t+ jdbc dataset to join with.

So how would I go a= bout kicking a batch from from streaming job?

Than= ks,
Tomas

--001a114dba6c803efb055ce73a5d--