From user-return-1249-archive-asf-public=cust-asf.ponee.io@arrow.apache.org Thu May 27 06:22:09 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id C8E2B18064E for ; Thu, 27 May 2021 08:22:09 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id 1ABE4616DC for ; Thu, 27 May 2021 06:22:09 +0000 (UTC) Received: (qmail 9866 invoked by uid 500); 27 May 2021 06:22:06 -0000 Mailing-List: contact user-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@arrow.apache.org Delivered-To: mailing list user@arrow.apache.org Received: (qmail 9855 invoked by uid 99); 27 May 2021 06:22:06 -0000 Received: from spamproc1-he-de.apache.org (HELO spamproc1-he-de.apache.org) (116.203.196.100) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 May 2021 06:22:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamproc1-he-de.apache.org (ASF Mail Server at spamproc1-he-de.apache.org) with ESMTP id 6C9491FF485 for ; Thu, 27 May 2021 06:22:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamproc1-he-de.apache.org X-Spam-Flag: NO X-Spam-Score: -0.001 X-Spam-Level: X-Spam-Status: No, score=-0.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamproc1-he-de.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=dynamicyield.com Received: from mx1-ec2-va.apache.org ([116.203.227.195]) by localhost (spamproc1-he-de.apache.org [116.203.196.100]) (amavisd-new, port 10024) with ESMTP id G_DePoZ6KCdk for ; Thu, 27 May 2021 06:22:03 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.219.52; helo=mail-qv1-f52.google.com; envelope-from=elad@dynamicyield.com; receiver= Received: from mail-qv1-f52.google.com (mail-qv1-f52.google.com [209.85.219.52]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 8D236BD1FF for ; Thu, 27 May 2021 06:22:03 +0000 (UTC) Received: by mail-qv1-f52.google.com with SMTP id e8so2062823qvp.7 for ; Wed, 26 May 2021 23:22:03 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=dynamicyield.com; s=dynamicyield; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=d4YBobYI+VcR/Tk0ckApazRNdIf6shTzlCR7rcDIKLY=; b=XE7/EuPsDhVgG4ONuagMoMAckkjCKwqSVUHF/fRhajpkFgaqNXCpQ6uV36WW9vFLS9 JdvmrETcYY7cSnGxWmOfoHnpH92fg+V6Z9ar46o7kWmdbhJVt4iXqwTcIOcYMQwqMV68 txczW0OcQTe0vX+d5f/A5SOiLg9TlH2OkM4As= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=d4YBobYI+VcR/Tk0ckApazRNdIf6shTzlCR7rcDIKLY=; b=Sf+XbtUObwvaInxbsTN5HDD/+7wALdZHc19Ju06wH/JKF9lAwAVz2ZyNCms9dAieSD 7a1Q+vDZCaTo4nNmo8WlOf2nvw3OukTtx/NmIePcTYRhnfkObJLWuw0PG/fGwwlZJhEz Tmr7z2kuara7Fmrq0s0G5cBuLkENZmVrfQuyAdBSlsPSH7rQvOiyiiMAtokxIJKzabVe djv3DqstsZ+QTTDZ1kep5AYNEfnUNwvf0+WxfE1d6B2hC8BQCGyNhO+70jbhhsO/oslS bLqUCDxZ22MeMb5eq3WUTqR9QbBVglHsSyhzOyjDxLPCl1TeF0vdiw3RFGQNa/muSG3s NNRA== X-Gm-Message-State: AOAM5314exSkdkbGcnGa8bRAu+wUEUuUfgxA27fblTneRKGE3rKyFbxd GqWlt5OQCox2sHTRq1U73YPoYOX7dfmNwBPp+oJ1zmjsNWVAmA== X-Google-Smtp-Source: ABdhPJwoexfNfqRC60Dkov3RysqJ2Y3bS7/bqLEwmCJs0VA2U5cniEnH+gAyV01c6YOipn3yHWu8vDCR7lU9vQVyMgo= X-Received: by 2002:a0c:ab88:: with SMTP id j8mr2072424qvb.23.1622096516523; Wed, 26 May 2021 23:21:56 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Elad Rosenheim Date: Thu, 27 May 2021 09:21:45 +0300 Message-ID: Subject: Re: Long-Running Continuous Data Saving to File To: user@arrow.apache.org Content-Type: multipart/alternative; boundary="000000000000a8db3205c349c652" --000000000000a8db3205c349c652 Content-Type: text/plain; charset="UTF-8" I want to add a few notes from my experience with Kafka: 1. There's an ecosystem - having battle-tested consumers that write to various external systems, with known reliability guarantees, is very helpful. It's also then possible to have multiple consumers - some batch, some real-time streaming (e.g. Apache Flink) or analytics (ksqlDB ). People have already given thought to schema evolution and whatnot, as Weston noted. 2. In terms of operations - yup, it wasn't as easy as I've hoped (mostly when servers crash and stuff like that). We also have a component that runs on a VM with Kafka running locally on that machine, used to buffer downstream writes. That's also a possible setup - you don't *have* to have a cluster. In this mode the buffer's durability is tied to the machine being live and the size of the local disk, but it can also "just work" for years. btw, Parquet not supporting append is in line with the bigger picture where HDFS and object stores (e.g. S3) don't really support appending to files (I think HDFS now does?), and many "big data" databases are based on LSM which generally does not require appending to existing files. So there's a whole assumption of using compaction (and then cleanup) / re-partitioning but not appending to already-written files. Elad On Thu, May 27, 2021 at 1:07 AM Weston Pace wrote: > Elad's advice is very helpful. This is not a problem that Arrow solves > today (to the best of my knowledge). It is a topic that comes up > periodically[1][2][3]. If a crash happens while your parquet stream writer > is open then the most likely outcome is that you will be missing the footer > (this gets written on close) and be unable to read the file (although it > could presumably be recovered). The parquet format may be able to support > an append mode but readers don't typically support it. > > I believe a common approach to this problem is to dump out lots of small > files as the data arrives and then periodically batch them together. Kafka > is a great way to do this but it could be done with a single process as > well. If you go very far down this path you will likely run into concerns > like durability and schema evolution so I don't mean to imply that it is > trivial :) > > [1] > https://stackoverflow.com/questions/47113813/using-pyarrow-how-do-you-append-to-parquet-file > [2] https://issues.apache.org/jira/browse/PARQUET-1154 > [3] > https://lists.apache.org/thread.html/r7efad314abec0219016886eaddc7ba79a451087b6324531bdeede1af%40%3Cdev.arrow.apache.org%3E > > On Wed, May 26, 2021 at 7:39 AM Elad Rosenheim > wrote: > >> Hi, >> >> While I'm not using the C++ version of Arrow, the issue you're talking >> about is a very common concern. >> >> There are a few points to discuss here: >> >> 1. Generally, Parquet files cannot be appended to. You could of course >> load the file to memory, add more information and re-save, but that's not >> really what you're looking for... tools like `parquet-tools` can >> concatenate files together by creating a new file with two (or more) row >> groups, but that's not a very good solution either. Having multiple row >> groups in a single file is sometimes desirable, but in this case would just >> create a less compressed file, most probably. >> >> 2. The other concern is reliability - having a process that holds a big >> batch in memory and then spills them to disk every X minutes/rows/bytes is >> bound to have issues when things crash/get stuck/need to go down for >> maintenance. You probably want to have as close to "exactly once" >> guarantees as possible (the holy grail...). One common solution for this is >> to write to Kafka, and a have a consumer that periodically reads a batch of >> messages and stores them to file. This is nowadays provided by Kafka >> Connect >> , >> thankfully. Anyway, the "exactly once" part stops at this point, and for >> anything that happens downstream you'd need >> >> 3. Then, you're back to the question of many many files per day... there >> is no magical solution to this. You may need to have a scheduled task that >> reads files every X hours (or every day?), and re-partitions the data in >> the way that makes the most sense for processing/querying later - perhaps >> by date, perhaps by customer, both, etc. There are various tools that help >> in this. >> >> Elad >> >> On Wed, May 26, 2021 at 7:32 PM Xander Dunn wrote: >> >>> I have a very long-running (months) program that is streaming in data >>> continually, processing it, and saving it to file using Arrow. My current >>> solution is to buffer several million rows and write them to a new .parquet >>> file each time. This works, but produces 1000+ files every day. >>> >>> If I could, I would just append to the same file for each day. I see an >>> `arrow::fs::FileySystem::OpenAppendStream` - what file formats does this >>> work with? Can I append to .parquet or .feather files? Googling seems to >>> indicate these formats can't be appended to. >>> >>> Using the `parquet::StreamWriter >>> `, >>> could I continually stream rows to a single file throughout the day? What >>> happens if the program is unexpectedly terminated? Would everything in the >>> currently open monolithic file be lost? I would be streaming rows to a >>> single .parquet file for 24 hours. >>> >>> Thanks, >>> Xander >>> >>> --000000000000a8db3205c349c652 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
I want to add a few notes from my experience with Kaf= ka:

1. There's an ecosystem - having battle-te= sted consumers that write to various external systems, with known reliabili= ty guarantees, is very helpful. It's also then possible to have multipl= e consumers - some batch, some real-time streaming (e.g. Apache Flink) or a= nalytics (ksqlDB). People have already g= iven thought to schema evolution and whatnot, as Weston noted.

2. In= terms of operations - yup, it wasn't as easy as I've hoped (mostly= when servers crash and stuff like that). We also have a component that run= s on a VM with Kafka running locally on that machine, used to buffer downst= ream writes. That's also a possible setup - you don't *have* to hav= e a cluster. In this mode the buffer's durability is tied to the machin= e being live and the size of the local disk, but it can also "just wor= k" for years.

btw,
Parquet not supporting=C2=A0append= is in line with the bigger picture where HDFS and object stores (e.g. S3) = don't really support appending to files (I think HDFS now does?), and m= any "big data" databases are based on LSM=C2=A0which generally does = not require appending to existing files. So there's a whole assumption = of using compaction (and then cleanup) / re-partitioning but not appending = to already-written files.
Elad

On Thu, May 27, 2021 at 1:07 = AM Weston Pace <weston@ursac= omputing.com> wrote:
Elad's advice is very helpful.=C2=A0 = This is not a problem that Arrow solves today (to the best of my knowledge)= .=C2=A0 It is a topic that comes up periodically[1][2][3].=C2=A0 If a crash= happens while your parquet stream writer is open then the most likely outc= ome is that you will be missing the footer (this gets written on close) and= be unable to read the file (although it could presumably be recovered).=C2= =A0 The parquet format may be able to support an append mode but readers do= n't typically support it.

I believe a common a= pproach to this problem is to dump out lots of small files as the data arri= ves and then periodically batch them together.=C2=A0 Kafka is a great way t= o do this but it could be done with a single process as well.=C2=A0 If you = go very far down this path you will likely run into concerns like durabilit= y and schema evolution so I don't mean to imply that it is trivial :)


On Wed, May 26, 2021 at 7:39 AM Elad Rosenhei= m <elad@dynam= icyield.com> wrote:
Hi,

While I'm not using = the C++ version of Arrow, the issue you're talking about is a very comm= on concern.

There are a few points to discuss here:

1.= Generally, Parquet files cannot be appended to. You could of course load t= he file to memory, add more information and re-save, but that's not rea= lly what you're looking for... tools like `parquet-tools` can concatena= te files together by creating a new file with two (or more) row groups, but= that's not a very good solution either. Having multiple row groups in = a single file is sometimes desirable, but in this case would just create a = less compressed file, most probably.

2. The other concern is reliabi= lity - having a process that holds a big batch in memory and then spills th= em to disk every X minutes/rows/bytes is bound to have issues when things c= rash/get stuck/need to go down for maintenance. You probably want to have a= s close to "exactly once" guarantees as possible (the holy grail.= ..). One common solution for this is to write to Kafka, and a have a consum= er that periodically reads a batch of messages and stores them to file. Thi= s is nowadays provided by Kafka Connect, thank= fully. Anyway, the "exactly once" part stops at this point, and f= or anything that happens downstream you'd need

3. Then, you'= re back to the question of many many files per day... there is no magical s= olution to this. You may need to have a scheduled task that reads files eve= ry X hours (or every day?), and re-partitions the data in the way that make= s the most sense for processing/querying later - perhaps by date, perhaps b= y customer, both, etc. There are various tools that help in this.

Elad

On Wed, May 26, 2021 at 7:32 PM Xander Dunn <<= a href=3D"mailto:xander@xander.ai" target=3D"_blank">xander@xander.ai&g= t; wrote:
<= div>
I have a very long-running (months) program that is streaming= in data continually, processing it, and saving it to file using Arrow. My = current solution is to buffer several million rows and write them to a new = .parquet file each time. This works, but produces 1000+ files every day.=C2= =A0

If I could, I would just append to the sam= e file for each day. I see an `arrow::fs::FileySystem::OpenAppendStream` - = what file formats does this work with? Can I append to .parquet or .feather= files? Googling seems to indicate these formats can't be appended to.<= br>

Using the `parquet::StreamWriter`, could I continually stream rows to a single = file throughout the day? What happens if the program is unexpectedly termin= ated? Would everything in the currently open monolithic file be lost?=C2=A0= I would be streaming rows to a single .parquet file for 24 hours.
=

Thanks,
Xander
= 3D""

--000000000000a8db3205c349c652--