apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Devendra Tagare <devend...@datatorrent.com>
Subject Re: Parquet Writer Operator
Date Fri, 17 Jun 2016 04:55:26 GMT
Hi,

WAL based approach :

The FileSystemWAL.FileSystemWALWriter closes a temporary file only after
the window is committed.We cannot read any such files till this point.
Once this file is committed, in the same committed callback the
ParquetOutputOperator will have to read the committed files, convert the
spooled records from the WAL to parquet and then write the parquet file.
A file can only be deleted from the WAL after it has been successfully
written as a parquet file.

Small files problem : to handle this with a WAL based approach, we will
have to read files from the WAL till the parquet block size is reached.This
will mean the WAL reader is could end up polling files for windows before
the highest committed window since the block size may not have been reached
in the committed callback for a given window.

Fault tolerance : if the parquet writes to the file system fail then the
operator will go down.In this case we will have to add a retry logic to
read the files from WAL for the windows which failed.

Please let me know if I am missing something in using the WAL and also if
using a 2 operator solution would be better suited in this case.

Thanks,
Dev


On Wed, Jun 15, 2016 at 5:02 PM, Thomas Weise <thomas@datatorrent.com>
wrote:

> Hi Dev,
>
> Can you not use the existing WAL implementation (via WindowDataManager or
> directly)?
>
> Thomas
>
>
> On Wed, Jun 15, 2016 at 3:47 PM, Devendra Tagare <
> devendrat@datatorrent.com>
> wrote:
>
> > Hi,
> >
> > Initial thoughts were to go for a WAL based approach where the operator
> > would first write POJO's to the WAL and then a separate thread would do
> the
> > task of reading from the WAL and writing the destination files based on
> the
> > block size.
> >
> > There is a ticket open for a pluggable spooling implementation with
> output
> > operators which can be leveraged for this,
> > https://issues.apache.org/jira/browse/APEXMALHAR-2037
> >
> > Since work is already being done on that front, we can plug in the
> spooler
> > with the existing implementation of the ParquetFileWriter at that point
> and
> > remove the first operator - ParquetFileOutputOperator.
> >
> > Thanks,
> > Dev
> >
> > On Tue, Jun 14, 2016 at 7:21 PM, Thomas Weise <thomas@datatorrent.com>
> > wrote:
> >
> > > What's the reason for not considering the WAL based approach?
> > >
> > > What are the pros and cons?
> > >
> > >
> > > On Tue, Jun 14, 2016 at 6:54 PM, Devendra Tagare <
> > > devendrat@datatorrent.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > We can focus on the below 2 problems,
> > > > 1.Avoid the small files problem which could arise due a flush at
> every
> > > > endWindow, since there wouldn't be significant data in a window.
> > > > 2.Fault Tolerance.
> > > >
> > > > *Proposal* : Create a module in which there are 2 operators,
> > > >
> > > > *Operator 1 : ParquetFileOutputOperator*
> > > > This operator will be an implementation of the
> > > AbstractFileOutputOperator.
> > > > It will write data to a HDFS location and leverage the
> fault-tolerance
> > > > semantics of the AbstractFileOutputOperator.
> > > >
> > > > This operator will implement the CheckpointNotificationListener and
> > will
> > > > emit the finalizedFiles from the beforeCheckpoint method.
> > > > Map<windowId,Set<files finalized in the window>>
> > > >
> > > > *Operator 2 : ParquetFileWriter*
> > > > This operator will receive a Set<files finalized in the window>
from
> > the
> > > > ParquetFileOutputOperator on its input port.
> > > > Once it receives this map, it will do the below things,
> > > >
> > > > 1.Save the input received to a Map<windowId,Set<InputFiles>>
> > > inputFilesMap
> > > >
> > > > 2.Instantiate a new ParquetWriter
> > > >   2.a. Get a unique file name.
> > > >   2.b. Add a configurable writer that extends the ParquetWriter and
> > > include
> > > > a write support for writing various supported formats like
> Avro,thrift
> > > etc.
> > > >
> > > > 3.For each file from the inputFilesMap,
> > > >   3.a Read the file and write the record using the writer created in
> > (2)
> > > >   3.b Check if the block size (configurable) is reached.If yes then
> > close
> > > > the file and add its entry to a
> > > > Map<windowId,CompletedFiles>completedFilesMap.Remove the entry from
> > > > inputFilesMap.
> > > >         If the writes fail then the files can be reprocessed from the
> > > > inputFilesMap.
> > > > 3.c In the committed callback remove the completed files from the
> > > directory
> > > > and prune the completedFilesMap for that window.
> > > >
> > > > Points to note,
> > > > 1.The block size check will be approximate since the data is in
> memory
> > > and
> > > > ParquetWriter does not expose a flush.
> > > > 2.This is at best a temporary implementation in the absence of a WAL
> > > based
> > > > approach.
> > > >
> > > > I would like to take a crack at this operator based on community
> > > feedback.
> > > >
> > > > Thoughts ?
> > > >
> > > > Thanks,
> > > > Dev
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Apr 25, 2016 at 12:36 PM, Tushar Gosavi <
> > tushar@datatorrent.com>
> > > > wrote:
> > > >
> > > > > Hi Shubham,
> > > > >
> > > > > +1 for the Parquet  writer.
> > > > >
> > > > > I doubt if we could leverage on recovery mechanism provided by
> > > > > AbstractFileOutputOperator as Parquet Writer does not expose flush,
> > and
> > > > > could write to underline stream at any time. To simplify recovery
> you
> > > can
> > > > > write a single file in each checkpoint duration. If this is not an
> > > > option,
> > > > > then
> > > > > you need to make use of WAL for recovery, and not use operator
> > > > > check-pointing for storing not persisted tuples, as checkpointing
> > huge
> > > > > state every 30 seconds is costly.
> > > > >
> > > > > Regards,
> > > > > -Tushar.
> > > > >
> > > > >
> > > > > On Mon, Apr 25, 2016 at 11:38 PM, Shubham Pathak <
> > > > shubham@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > Hello Community,
> > > > > >
> > > > > > Apache Parquet <https://parquet.apache.org/documentation/latest/
> >
> > > is a
> > > > > > columnar oriented binary file format designed to be extremely
> > > efficient
> > > > > and
> > > > > > interoperable across Hadoop ecosystem. It has integrations with
> > most
> > > of
> > > > > the
> > > > > > Hadoop processing frameworks ( Impala, Hive, Pig, Spark.. )
and
> > > > > > serialization models (Thrift, Avro, Protobuf)  making it easy
to
> > use
> > > in
> > > > > ETL
> > > > > > and processing pipelines.
> > > > > >
> > > > > > Having an operator to write data to Parquet files would certainly
> > be
> > > a
> > > > > good
> > > > > > addition to the Malhar library.
> > > > > >
> > > > > > The underlying implementation
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/
> > > > > > >
> > > > > > for writing data as Parquet, requires a subclass of
> > > > > > parquet.hadoop.api.WriteSupport that knows how to take an
> in-memory
> > > > > object
> > > > > > and write Parquet primitives through
> > parquet.io.api.RecordConsumer*.*
> > > > > > Currently, there are several WriteSupport implementations,
> > including
> > > > > > ThriftWriteSupport,
> > > > > > AvroWriteSupport, and ProtoWriteSupport.
> > > > > > These WriteSupport implementations are then wrapped as
> > ParquetWriter
> > > > > > objects for writing.
> > > > > >
> > > > > > Parquet Writers do not expose a handle to the underlying stream.
> In
> > > > order
> > > > > > to  write data to a Parquet file, all the records ( that belong
> to
> > > > file )
> > > > > > must be buffered in memory. These records are then compressed
and
> > > later
> > > > > > flushed to the file.
> > > > > >
> > > > > > To start with, we could support following features in the
> operator
> > > > > >
> > > > > >    - *Ability to provide a WriteSupport Implementation* : The
> user
> > > > should
> > > > > >    be able to use existing implementations of parquet.hadoop.api.
> > > > > >    WriteSupport or provide his/her own implementation.
> > > > > >    - *Ability to configure Page Size : *Refers to the amount
of
> > > > > >    uncompressed data for a single column that is read before
it
> is
> > > > > > compressed
> > > > > >    as a unit and buffered in memory to be written out as a
> “page”.
> > > > > Default
> > > > > >    value : 1MB
> > > > > >    - *Ability to configure Parquet Block Size : *Refers to the
> > amount
> > > > of
> > > > > >    compressed data that should be buffered in memory before
a row
> > > group
> > > > > is
> > > > > >    written out to disk. Larger block sizes require more memory
to
> > > > buffer
> > > > > > the
> > > > > >    data; Recommended is 128 MB / 256 MB
> > > > > >    - *Flushing files periodically* :Operator would have to flush
> > > files
> > > > > >    periodically in a specified directory as per configured block
> > > size .
> > > > > > This
> > > > > >    could be time-based / number of events based  / size based
> > > > > >
> > > > > > To implement the operator, here's one approach  :
> > > > > >
> > > > > >    1. Extend existing AbstractFileOutputOperator
> > > > > >    2.  Provide methods to add write support implementations.
> > > > > >    3. In process method, hold the data in memory till we reach
a
> > > > > configured
> > > > > >    size and then flush  the contents to a file during
> endWindow().
> > > > > >
> > > > > > Please send across your thoughts on this. I would also like
to
> know
> > > if
> > > > we
> > > > > > would be able to leverage recovery mechanisms provided by
> > > > > > AbstractFileOutputOperator using this approach?
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Shubham
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message