apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Devendra Tagare <devend...@datatorrent.com>
Subject Re: Parquet Writer Operator
Date Fri, 17 Jun 2016 23:20:19 GMT
Hi,

I will try this approach with a prototype and get back.

Thanks,
Dev

On Thu, Jun 16, 2016 at 10:04 PM, Chandni Singh <singh.chandni@gmail.com>
wrote:

> Dev,
>
> The FileSystemWalWritter closes the temporary as soon as it gets rotated.
> It renames (finalizes) the temporary file to the actual file until the
> window is committed. The mapping of temporary file to actual file is
> present in the checkpointed state.
>
> The FileSystemWalReader reads from the temporary file so maybe you can use
> that  to read the wal.
>
> Chandni
>
>
>
> On Thu, Jun 16, 2016 at 9:55 PM, Devendra Tagare <
> devendrat@datatorrent.com>
> wrote:
>
> > Hi,
> >
> > WAL based approach :
> >
> > The FileSystemWAL.FileSystemWALWriter closes a temporary file only after
> > the window is committed.We cannot read any such files till this point.
> > Once this file is committed, in the same committed callback the
> > ParquetOutputOperator will have to read the committed files, convert the
> > spooled records from the WAL to parquet and then write the parquet file.
> > A file can only be deleted from the WAL after it has been successfully
> > written as a parquet file.
> >
> > Small files problem : to handle this with a WAL based approach, we will
> > have to read files from the WAL till the parquet block size is
> reached.This
> > will mean the WAL reader is could end up polling files for windows before
> > the highest committed window since the block size may not have been
> reached
> > in the committed callback for a given window.
> >
> > Fault tolerance : if the parquet writes to the file system fail then the
> > operator will go down.In this case we will have to add a retry logic to
> > read the files from WAL for the windows which failed.
> >
> > Please let me know if I am missing something in using the WAL and also if
> > using a 2 operator solution would be better suited in this case.
> >
> > Thanks,
> > Dev
> >
> >
> > On Wed, Jun 15, 2016 at 5:02 PM, Thomas Weise <thomas@datatorrent.com>
> > wrote:
> >
> > > Hi Dev,
> > >
> > > Can you not use the existing WAL implementation (via WindowDataManager
> or
> > > directly)?
> > >
> > > Thomas
> > >
> > >
> > > On Wed, Jun 15, 2016 at 3:47 PM, Devendra Tagare <
> > > devendrat@datatorrent.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Initial thoughts were to go for a WAL based approach where the
> operator
> > > > would first write POJO's to the WAL and then a separate thread would
> do
> > > the
> > > > task of reading from the WAL and writing the destination files based
> on
> > > the
> > > > block size.
> > > >
> > > > There is a ticket open for a pluggable spooling implementation with
> > > output
> > > > operators which can be leveraged for this,
> > > > https://issues.apache.org/jira/browse/APEXMALHAR-2037
> > > >
> > > > Since work is already being done on that front, we can plug in the
> > > spooler
> > > > with the existing implementation of the ParquetFileWriter at that
> point
> > > and
> > > > remove the first operator - ParquetFileOutputOperator.
> > > >
> > > > Thanks,
> > > > Dev
> > > >
> > > > On Tue, Jun 14, 2016 at 7:21 PM, Thomas Weise <
> thomas@datatorrent.com>
> > > > wrote:
> > > >
> > > > > What's the reason for not considering the WAL based approach?
> > > > >
> > > > > What are the pros and cons?
> > > > >
> > > > >
> > > > > On Tue, Jun 14, 2016 at 6:54 PM, Devendra Tagare <
> > > > > devendrat@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > We can focus on the below 2 problems,
> > > > > > 1.Avoid the small files problem which could arise due a flush
at
> > > every
> > > > > > endWindow, since there wouldn't be significant data in a window.
> > > > > > 2.Fault Tolerance.
> > > > > >
> > > > > > *Proposal* : Create a module in which there are 2 operators,
> > > > > >
> > > > > > *Operator 1 : ParquetFileOutputOperator*
> > > > > > This operator will be an implementation of the
> > > > > AbstractFileOutputOperator.
> > > > > > It will write data to a HDFS location and leverage the
> > > fault-tolerance
> > > > > > semantics of the AbstractFileOutputOperator.
> > > > > >
> > > > > > This operator will implement the CheckpointNotificationListener
> and
> > > > will
> > > > > > emit the finalizedFiles from the beforeCheckpoint method.
> > > > > > Map<windowId,Set<files finalized in the window>>
> > > > > >
> > > > > > *Operator 2 : ParquetFileWriter*
> > > > > > This operator will receive a Set<files finalized in the window>
> > from
> > > > the
> > > > > > ParquetFileOutputOperator on its input port.
> > > > > > Once it receives this map, it will do the below things,
> > > > > >
> > > > > > 1.Save the input received to a Map<windowId,Set<InputFiles>>
> > > > > inputFilesMap
> > > > > >
> > > > > > 2.Instantiate a new ParquetWriter
> > > > > >   2.a. Get a unique file name.
> > > > > >   2.b. Add a configurable writer that extends the ParquetWriter
> and
> > > > > include
> > > > > > a write support for writing various supported formats like
> > > Avro,thrift
> > > > > etc.
> > > > > >
> > > > > > 3.For each file from the inputFilesMap,
> > > > > >   3.a Read the file and write the record using the writer created
> > in
> > > > (2)
> > > > > >   3.b Check if the block size (configurable) is reached.If yes
> then
> > > > close
> > > > > > the file and add its entry to a
> > > > > > Map<windowId,CompletedFiles>completedFilesMap.Remove the
entry
> from
> > > > > > inputFilesMap.
> > > > > >         If the writes fail then the files can be reprocessed
from
> > the
> > > > > > inputFilesMap.
> > > > > > 3.c In the committed callback remove the completed files from
the
> > > > > directory
> > > > > > and prune the completedFilesMap for that window.
> > > > > >
> > > > > > Points to note,
> > > > > > 1.The block size check will be approximate since the data is
in
> > > memory
> > > > > and
> > > > > > ParquetWriter does not expose a flush.
> > > > > > 2.This is at best a temporary implementation in the absence
of a
> > WAL
> > > > > based
> > > > > > approach.
> > > > > >
> > > > > > I would like to take a crack at this operator based on community
> > > > > feedback.
> > > > > >
> > > > > > Thoughts ?
> > > > > >
> > > > > > Thanks,
> > > > > > Dev
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Apr 25, 2016 at 12:36 PM, Tushar Gosavi <
> > > > tushar@datatorrent.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Shubham,
> > > > > > >
> > > > > > > +1 for the Parquet  writer.
> > > > > > >
> > > > > > > I doubt if we could leverage on recovery mechanism provided
by
> > > > > > > AbstractFileOutputOperator as Parquet Writer does not expose
> > flush,
> > > > and
> > > > > > > could write to underline stream at any time. To simplify
> recovery
> > > you
> > > > > can
> > > > > > > write a single file in each checkpoint duration. If this
is not
> > an
> > > > > > option,
> > > > > > > then
> > > > > > > you need to make use of WAL for recovery, and not use operator
> > > > > > > check-pointing for storing not persisted tuples, as
> checkpointing
> > > > huge
> > > > > > > state every 30 seconds is costly.
> > > > > > >
> > > > > > > Regards,
> > > > > > > -Tushar.
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Apr 25, 2016 at 11:38 PM, Shubham Pathak <
> > > > > > shubham@datatorrent.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Community,
> > > > > > > >
> > > > > > > > Apache Parquet <
> > https://parquet.apache.org/documentation/latest/
> > > >
> > > > > is a
> > > > > > > > columnar oriented binary file format designed to be
extremely
> > > > > efficient
> > > > > > > and
> > > > > > > > interoperable across Hadoop ecosystem. It has integrations
> with
> > > > most
> > > > > of
> > > > > > > the
> > > > > > > > Hadoop processing frameworks ( Impala, Hive, Pig,
Spark.. )
> and
> > > > > > > > serialization models (Thrift, Avro, Protobuf)  making
it easy
> > to
> > > > use
> > > > > in
> > > > > > > ETL
> > > > > > > > and processing pipelines.
> > > > > > > >
> > > > > > > > Having an operator to write data to Parquet files
would
> > certainly
> > > > be
> > > > > a
> > > > > > > good
> > > > > > > > addition to the Malhar library.
> > > > > > > >
> > > > > > > > The underlying implementation
> > > > > > > > <
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/
> > > > > > > > >
> > > > > > > > for writing data as Parquet, requires a subclass of
> > > > > > > > parquet.hadoop.api.WriteSupport that knows how to
take an
> > > in-memory
> > > > > > > object
> > > > > > > > and write Parquet primitives through
> > > > parquet.io.api.RecordConsumer*.*
> > > > > > > > Currently, there are several WriteSupport implementations,
> > > > including
> > > > > > > > ThriftWriteSupport,
> > > > > > > > AvroWriteSupport, and ProtoWriteSupport.
> > > > > > > > These WriteSupport implementations are then wrapped
as
> > > > ParquetWriter
> > > > > > > > objects for writing.
> > > > > > > >
> > > > > > > > Parquet Writers do not expose a handle to the underlying
> > stream.
> > > In
> > > > > > order
> > > > > > > > to  write data to a Parquet file, all the records
( that
> belong
> > > to
> > > > > > file )
> > > > > > > > must be buffered in memory. These records are then
compressed
> > and
> > > > > later
> > > > > > > > flushed to the file.
> > > > > > > >
> > > > > > > > To start with, we could support following features
in the
> > > operator
> > > > > > > >
> > > > > > > >    - *Ability to provide a WriteSupport Implementation*
: The
> > > user
> > > > > > should
> > > > > > > >    be able to use existing implementations of
> > parquet.hadoop.api.
> > > > > > > >    WriteSupport or provide his/her own implementation.
> > > > > > > >    - *Ability to configure Page Size : *Refers to
the amount
> of
> > > > > > > >    uncompressed data for a single column that is read
before
> it
> > > is
> > > > > > > > compressed
> > > > > > > >    as a unit and buffered in memory to be written
out as a
> > > “page”.
> > > > > > > Default
> > > > > > > >    value : 1MB
> > > > > > > >    - *Ability to configure Parquet Block Size : *Refers
to
> the
> > > > amount
> > > > > > of
> > > > > > > >    compressed data that should be buffered in memory
before a
> > row
> > > > > group
> > > > > > > is
> > > > > > > >    written out to disk. Larger block sizes require
more
> memory
> > to
> > > > > > buffer
> > > > > > > > the
> > > > > > > >    data; Recommended is 128 MB / 256 MB
> > > > > > > >    - *Flushing files periodically* :Operator would
have to
> > flush
> > > > > files
> > > > > > > >    periodically in a specified directory as per configured
> > block
> > > > > size .
> > > > > > > > This
> > > > > > > >    could be time-based / number of events based  /
size based
> > > > > > > >
> > > > > > > > To implement the operator, here's one approach  :
> > > > > > > >
> > > > > > > >    1. Extend existing AbstractFileOutputOperator
> > > > > > > >    2.  Provide methods to add write support implementations.
> > > > > > > >    3. In process method, hold the data in memory till
we
> reach
> > a
> > > > > > > configured
> > > > > > > >    size and then flush  the contents to a file during
> > > endWindow().
> > > > > > > >
> > > > > > > > Please send across your thoughts on this. I would
also like
> to
> > > know
> > > > > if
> > > > > > we
> > > > > > > > would be able to leverage recovery mechanisms provided
by
> > > > > > > > AbstractFileOutputOperator using this approach?
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Shubham
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message