beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eugene Kirpichov <kirpic...@google.com>
Subject Re: CSVSplitter - Splittable DoFn
Date Tue, 24 Apr 2018 22:26:58 GMT
Robert - you're right, but this is a pathological case. It signals that
there *might* be cases where we'll need to scan the whole file, however for
practical purposes it's more important whether we need to scan the whole
file in *all* (or most) cases - i.e. whether no amount of backward scanning
of a non-pathological file can give us confidence that we're truly located
a record boundary.

On Tue, Apr 24, 2018 at 3:21 PM Robert Bradshaw <robertwb@google.com> wrote:

> On Tue, Apr 24, 2018 at 3:18 PM Eugene Kirpichov <kirpichov@google.com>
> wrote:
>
> > I think the first question that has to be answered here is: Is it
> possible *at all* to implement parallel reading of RFC 4180?
>
> No. Consider a multi-record CSV file with no quotes. Placing a quote at the
> start and end gives a new CSV file with exactly one element.
>
> > I.e., given a start byte offset, is it possible to reliably locate the
> first record boundary at or after that offset while scanning only a small
> amount of data?
> > If it is possible, then that's what the SDF (or BoundedSource, etc.)
> should do - split into blind byte ranges, and use this algorithm to assign
> consistent meaning to byte ranges.
>
> > To answer your questions 2 and 3: think of it this way.
> > The SDF's ProcessElement takes an element and a restriction.
> > ProcessElement must make only one promise: that it will correctly perform
> exactly the work associated with this element and restriction.
> > The challenge is that the restriction can become smaller while
> ProcessElement runs - in which case, ProcessElement must also do fewer
> work. This can happen concurrently to ProcessElement running, so really the
> guarantee should be rephrased as "By the time ProcessElement completes, it
> should have performed exactly the work associated with the element and
> tracker.currentRestriction() at the moment of completion".
>
> > This is all that is asked of ProcessElement. If Beam decides to ask the
> tracker to split itself into two ranges (making the current one - "primary"
> - smaller, and producing an additional one - "residual"), Beam of course
> takes the responsibility for executing the residual restriction somewhere
> else: it won't be lost.
>
> > E.g. if ProcessElement was invoked with [a, b), but while it was invoked
> it was split into [a, b-100) and [b-100, b), then the current
> ProcessElement call must process [a, b-100), and Beam guarantees that it
> will fire up another ProcessElement call for [b-100, b) (Of course, both of
> these calls may end up being recursively split further).
>
> > I'm not quite sure what you mean by "recombining" - please let me know if
> the explanation above makes things clear enough or not.
>
> > On Tue, Apr 24, 2018 at 2:55 PM Peter Brumblay <pbrumblay@fearlesstg.com
> >
> wrote:
>
> >> Hi Eugene, thank you for the feedback!
>
> >> TextIO.read() can't handle RFC 4180 in full (at least I don't think it
> does!) - we have a lot of source data with embedded newlines. These records
> get split improperly because TextIO.read() blindly looks for newline
> characters. We need something which natively understands embedded newlines
> in quoted fields ... like so:
>
> >> foo,bar,"this has an\r\nembedded newline",192928\r\n
>
> >> As for the other feedback:
>
> >> 1. Claiming the entire range - yes, I figured this was a major mistake.
> Thanks for the confirmation.
> >> 2. The code for initial splitting of the restriction seems very
> complex...
>
> >> Follow-up question: if I process (and claim) only a subset of a range,
> say [a, b - 100), and [b - 100, b) represents an incomplete block, will
> beam SDF dynamically recombine ranges such that [b - 100, b + N) is sent to
> a worker with a (potentially) complete block?
>
> >> 3. Fine-tuning the evenness .... if beam SDF re-combines ranges for
> split blocks then it sounds like arbitrary splits in splitFunction() makes
> more sense.
>
> >> I'll try to take another pass at this with your feedback in mind.
>
> >> Peter
>
>
>
> >> On Tue, Apr 24, 2018 at 3:08 PM, Eugene Kirpichov <kirpichov@google.com
> >
> wrote:
>
> >>> Hi Peter,
>
> >>> Thanks for experimenting with SDF! However, in this particular case:
> any reason why you can't just use TextIO.read() and parse each line as CSV?
> Seems like that would require considerably less code.
>
> >>> A few comments on this code per se:
> >>> - The ProcessElement implementation immediately claims the entire
> range, which means that there can be no dynamic splitting and the code
> behaves equivalently to a regular DoFn
> >>> - The code for initial splitting of the restriction seems very complex
> - can you just split it blindly into a bunch of byte ranges of about equal
> size? Looking at the actual data while splitting should be never necessary
> - you should be able to just look at the file size (say, 100MB) and split
> it into a bunch of splits, say, [0, 10MB), [10MB, 20MB) etc.
> >>> - It seems that the splitting code tries to align splits with record
> boundaries - this is not useful: it does not matter whether the split
> boundaries fall onto record boundaries or not; instead, the reading code
> should be able to read an arbitrary range of bytes in a meaningful way.
> That typically means that reading [a, b) means "start at the first record
> boundary located at or after "a", end at the first record boundary located
> at or after "b""
> >>> - Fine-tuning the evenness of initial splitting is also not useful:
> dynamic splitting will even things out anyway; moreover, even if you are
> able to achieve an equal amount of data read by different restrictions, it
> does not translate into equal time to process the data with the ParDo's
> fused into the same bundle (and that time is unpredictable).
>
>
> >>> On Tue, Apr 24, 2018 at 1:24 PM Peter Brumblay
> >>> <pbrumblay@fearlesstg.com>
> wrote:
>
> >>>> Hi All,
>
> >>>> I noticed that there is no support for CSV file reading (e.g. rfc4180)
> in Apache Beam - at least no native transform. There's an issue to add this
> support: https://issues.apache.org/jira/browse/BEAM-51.
>
> >>>> I've seen examples which use the apache commons csv parser. I took a
> shot at implementing a SplittableDoFn transform. I have the full code and
> some questions in a gist here:
> https://gist.github.com/pbrumblay/9474dcc6cd238c3f1d26d869a20e863d.
>
> >>>> I suspect it could be improved quite a bit. If anyone has time to
> provide feedback I would really appreciate it.
>
> >>>> Regards,
>
> >>>> Peter Brumblay
> >>>> Fearless Technology Group, Inc.
>

Mime
View raw message