beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jean-Baptiste Onofré ...@nanthrax.net>
Subject Re: Issues with simple KafkaIO-read pipeline -- where to write?
Date Tue, 20 Sep 2016 01:22:29 GMT
Hi Dan,

thanks again for the detailed explanation.

I will prepare some questions for you ;)

Regards
JB

On 09/20/2016 01:37 AM, Dan Halperin wrote:
> Hey folks,
>
> Sorry for the confusion around sinks. Let me see if I can clear things up.
>
> In Beam, a Source+Reader is a very integral part of the model. A source
> is the root of a pipeline and it is where runners can do lots of
> important things like creating bundles, producing watermarks, and taking
> checkpoints.
>
> So we initially thought that we should have a generic Write transform
> that would encapsulate common patterns when outputting data. We started
> with Write+Sink.
>
> The Write+Sink class is simply a wrapper for the pattern (1-time global
> initialization, parallel writing, 1-time global finalization), but it is
> nothing more than a bunch of ParDos wired together by side inputs for
> control flow. However, this pattern ONLY applies naturally to
> PCollections that trigger exactly 1 time across all windows -- really,
> only bounded PCollections. So it simply does not work with unbounded
> PCollections.
>
> Over time, we have learned that this was not a very general pattern.
> Specifically, I believe there is exactly 1 use of the Sink today, which
> is for files (FileBasedSink, and also HadoopFileSink). The global
> patterns for FileBasedSink look like (1 time, create temp directory ;
> many times, write a temp file ; 1 time, rename and renumber all files).
>
> Most other places we write data need less structure. Kafka, Google Cloud
> Datastore, Google Cloud Bigtable -- most of the time you simply insert
> one record or a bundle of records in a transaction, but you don't have
> global multi-transaction support across many bundles. These are
> implemented purely as DoFns, because we can encapsulate bundle-based
> batching or transactions at that level. See BigtableIO
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L538>
> or DatastoreIO
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java#L896>
> for examples.
>
> Other times, we actually want *more* structure. For BigQueryIO in
> bounded mode, we: 1) write all files in parallel ; 2) 1-time globally
> break temp tables into groups that fit within 1 BigQuery import job ; 3)
> create multiple temp tables in bigquery in parallel ; 4) concatenate all
> the temp tables into the final table ; 5) delete all the temp files and
> the temp tables.
>
> The fact that Write+Sink has not proved that useful has slowed down our
> movement on any sort of unbounded sink or streaming sink. Instead, I
> think it makes more sense to identify the important patterns and build
> to those.
>
> The Flink Rolling[File]Sink is a specific kind of sink that we should
> incorporate into the SDK core alongside TextIO. But the key is that
> RollingSink is not "streaming TextIO.Write" - it's actually a file sink
> with specific semantics that are different than those of TextIO.Write.
>
> Similarly, we could extend TextIO.Write to support a file pattern like
> "-WW-SS-of-NN-GG", where "WW" is based on the window, "SS of NN" is the
> index of a particular shard within a write, and "GG" is the generation
> number aka the number of times this Window+Shard have triggered (based
> on PaneInfo). And of course to support this we'd have to force a fixed
> sharding pattern all the time when writing, which hurts efficiency and
> runner autotuning. In general, this is hard to do write in a way that
> makes sense with late data, accumulating mode, etc.
>    (If you could enforce 1 triggering per window, we could drop the -GG.)
>
> Hope that helps explain why we don't have any awesome streaming sinks
> yet in the SDK. However, we'd love for someone to start a Beam port of
> RollingFileSink or something similar which has sensible semantics in the
> presence of windowing, triggers, multiple firing, accumulating mode...
> (Possibly just rejecting some inputs it can't handle).
>
> Thanks!
> Dan
>
>
>
> On Mon, Sep 19, 2016 at 2:11 PM, Jean-Baptiste Onofré <jb@nanthrax.net
> <mailto:jb@nanthrax.net>> wrote:
>
>     Hi Thomas,
>
>     thanks for the update.
>
>     Stupid question: right now, most of the IO Sink use a simple DoFn
>     (to write the PCollection elements).
>
>     Does it mean that, when possible, we should use Sink abstract class,
>     with a Writer that can write bundles (using open(), write(), close()
>     methods) ?
>     In that case, will the Window create the bundle ?
>
>     Regards
>     JB
>
>     On 09/19/2016 10:46 PM, Thomas Groh wrote:
>
>         The model doesn't support dynamically creating PCollections, so the
>         proposed transform producing a Bounded PCollection from a window
>         of an
>         Unbounded PCollection means that you end up with a Bounded
>         Pipeline - in
>         which case it is preferable to use a Bounded Read instead. As you're
>         reading from an unbounded input, it may be necessary to write the
>         windowed values to a sink that supports continuous updates, from
>         which
>         you can execute a Bounded Pipeline over the appropriate set of input
>         instead of on some arbitrary chunk of records (e.g. the window).
>
>         The current Write implementation works effectively only for
>         sinks that
>         can assume they can do something exactly once and finish. This
>         is not an
>         assumption that can be made within arbitrary pipelines. Sinks that
>         currently only work for Bounded inputs are generally written with
>         assumptions that mean they do not work in the presence of multiple
>         trigger firings, late data, or across multiple windows, and handling
>         these facets of the model requires different behavior or
>         configuration
>         from different sinks.
>
>         On Mon, Sep 19, 2016 at 9:53 AM, Emanuele Cesena
>         <emanuele@shopkick.com <mailto:emanuele@shopkick.com>
>         <mailto:emanuele@shopkick.com <mailto:emanuele@shopkick.com>>>
>         wrote:
>
>             Oh yes, even better I’d say.
>
>             Best,
>
>
>             > On Sep 19, 2016, at 9:48 AM, Jean-Baptiste Onofré
>         <jb@nanthrax.net <mailto:jb@nanthrax.net>
>             <mailto:jb@nanthrax.net <mailto:jb@nanthrax.net>>> wrote:
>             >
>             > Hi Emanuele,
>             >
>             > +1 to support Unbounded sink, but also, a very convenient
>         function
>             would be a Window to create a bounded collection as a subset
>         of a
>             unbounded collection.
>             >
>             > Regards
>             > JB
>             >
>             > On 09/19/2016 05:59 PM, Emanuele Cesena wrote:
>             >> Hi,
>             >>
>             >> This is a great insight. Is there any plan to support
>         unbounded
>             sink in Beam?
>             >>
>             >> On the temp kafka->kafka solution, this is exactly what we’re
>             doing (and I wish to change). We have production stream
>         pipelines
>             that are kafka->kafka. Then we have 2 main use cases: kafka
>         connect
>             to dump into hive and go batch from there, and druid for
>         real time
>             reporting.
>             >>
>             >> However this makes prototyping really slow, and I wanted to
>             introduce Beam to short cut from kafka to anywhere.
>             >>
>             >> Best,
>             >>
>             >>
>             >>> On Sep 18, 2016, at 10:38 PM, Aljoscha Krettek
>             <aljoscha@apache.org <mailto:aljoscha@apache.org>
>         <mailto:aljoscha@apache.org <mailto:aljoscha@apache.org>>> wrote:
>             >>>
>             >>> Hi,
>             >>> right now, writing to a Beam "Sink" is only supported for
>             bounded streams, as you discovered. An unbounded stream
>         cannot be
>             transformed to a bounded stream using a window, this will just
>             "chunk" the stream differently but it will still be unbounded.
>             >>>
>             >>> The options you have right now for writing are to write
>         to your
>             external datastore using a DoFn, using KafkaIO to write to a
>         Kafka
>             topic or to use UnboundedFlinkSink to wrap a Flink Sink for
>         use in a
>             Beam pipeline. The latter would allow you to use, for example,
>             BucketingSink or RollingSink from Flink. I'm only mentioning
>             UnboundedFlinkSink for completeness, I would not recommend
>         using it
>             since your program will only work on the Flink runner. The
>         way to
>             go, IMHO, would be to write to Kafka and then take the data from
>             there and ship it to some final location such as HDFS.
>             >>>
>             >>> Cheers,
>             >>> Aljoscha
>             >>>
>             >>> On Sun, 18 Sep 2016 at 23:17 Emanuele Cesena
>             <emanuele@shopkick.com <mailto:emanuele@shopkick.com>
>         <mailto:emanuele@shopkick.com <mailto:emanuele@shopkick.com>>>
>         wrote:
>             >>> Thanks I’ll look into it, even if it’s not really the
>         feature I
>             need (exactly because it will stop execution).
>             >>>
>             >>>
>             >>>> On Sep 18, 2016, at 2:11 PM, Chawla,Sumit
>             <sumitkchawla@gmail.com <mailto:sumitkchawla@gmail.com>
>         <mailto:sumitkchawla@gmail.com <mailto:sumitkchawla@gmail.com>>>
>         wrote:
>             >>>>
>             >>>> Hi Emanuele
>             >>>>
>             >>>> KafkaIO  supports withMaxNumRecords(X) support which will
>             create a bounded source from Kafka. However, the pipeline will
>             finish once X number of records are read.
>             >>>>
>             >>>> Regards
>             >>>> Sumit Chawla
>             >>>>
>             >>>>
>             >>>> On Sun, Sep 18, 2016 at 2:00 PM, Emanuele Cesena
>             <emanuele@shopkick.com <mailto:emanuele@shopkick.com>
>         <mailto:emanuele@shopkick.com <mailto:emanuele@shopkick.com>>>
>         wrote:
>             >>>> Hi,
>             >>>>
>             >>>> Thanks for the hint - I’ll debug better but I thought
I
>         did that:
>             >>>>
>
>         https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java#L140
>         <https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java#L140>
>
>         <https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java#L140
>         <https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java#L140>>
>             >>>>
>             >>>> Best,
>             >>>>
>             >>>>
>             >>>>> On Sep 18, 2016, at 1:54 PM, Jean-Baptiste Onofré
>             <jb@nanthrax.net <mailto:jb@nanthrax.net>
>         <mailto:jb@nanthrax.net <mailto:jb@nanthrax.net>>> wrote:
>             >>>>>
>             >>>>> Hi Emanuele
>             >>>>>
>             >>>>> You have to use a window to create a bounded
>         collection from
>             an unbounded source.
>             >>>>>
>             >>>>> Regards
>             >>>>> JB
>             >>>>>
>             >>>>> On Sep 18, 2016, at 21:04, Emanuele Cesena
>             <emanuele@shopkick.com <mailto:emanuele@shopkick.com>
>         <mailto:emanuele@shopkick.com <mailto:emanuele@shopkick.com>>>
>         wrote:
>             >>>>> Hi,
>             >>>>>
>             >>>>> I wrote a while ago about a simple example I was
>         building to
>             test KafkaIO:
>             >>>>>
>
>         https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java
>         <https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java>
>
>         <https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java
>         <https://github.com/ecesena/beam-starter/blob/master/src/main/java/com/dataradiant/beam/examples/StreamWordCount.java>>
>             >>>>>
>             >>>>> Issues with Flink should be fixed now, and I’m try
to
>         run the
>             example on master and Flink 1.1.2.
>             >>>>> I’m currently getting:
>             >>>>> Caused by: java.lang.IllegalArgumentException: Write
>         can only
>             be applied to a Bounded PCollection
>             >>>>>
>             >>>>> What is the recommended way to go here?
>             >>>>> - is there a way to create a bounded collection from
an
>             unbounded one?
>             >>>>> - is there a plat to let TextIO write unbounded
>         collections?
>             >>>>> - is there another recommended “simple sink” to
use?
>             >>>>>
>             >>>>> Thank you much!
>             >>>>>
>             >>>>> Best,
>             >>>>
>             >>>> --
>             >>>> Emanuele Cesena, Data Eng.
>             >>>> http://www.shopkick.com
>             >>>>
>             >>>> Il corpo non ha ideali
>             >>>>
>             >>>>
>             >>>>
>             >>>>
>             >>>>
>             >>>
>             >>> --
>             >>> Emanuele Cesena, Data Eng.
>             >>> http://www.shopkick.com
>             >>>
>             >>> Il corpo non ha ideali
>             >>>
>             >>>
>             >>>
>             >>>
>             >>
>             >
>             > --
>             > Jean-Baptiste Onofré
>             > jbonofre@apache.org <mailto:jbonofre@apache.org>
>         <mailto:jbonofre@apache.org <mailto:jbonofre@apache.org>>
>             > http://blog.nanthrax.net
>             > Talend - http://www.talend.com
>
>             --
>             Emanuele Cesena, Data Eng.
>             http://www.shopkick.com
>
>             Il corpo non ha ideali
>
>
>
>
>
>
>     --
>     Jean-Baptiste Onofré
>     jbonofre@apache.org <mailto:jbonofre@apache.org>
>     http://blog.nanthrax.net
>     Talend - http://www.talend.com
>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Mime
View raw message