apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chandni Singh <singh.chan...@gmail.com>
Subject Re: Parquet Writer Operator
Date Fri, 17 Jun 2016 05:04:55 GMT
Dev,

The FileSystemWalWritter closes the temporary as soon as it gets rotated.
It renames (finalizes) the temporary file to the actual file until the
window is committed. The mapping of temporary file to actual file is
present in the checkpointed state.

The FileSystemWalReader reads from the temporary file so maybe you can use
that  to read the wal.

Chandni



On Thu, Jun 16, 2016 at 9:55 PM, Devendra Tagare <devendrat@datatorrent.com>
wrote:

> Hi,
>
> WAL based approach :
>
> The FileSystemWAL.FileSystemWALWriter closes a temporary file only after
> the window is committed.We cannot read any such files till this point.
> Once this file is committed, in the same committed callback the
> ParquetOutputOperator will have to read the committed files, convert the
> spooled records from the WAL to parquet and then write the parquet file.
> A file can only be deleted from the WAL after it has been successfully
> written as a parquet file.
>
> Small files problem : to handle this with a WAL based approach, we will
> have to read files from the WAL till the parquet block size is reached.This
> will mean the WAL reader is could end up polling files for windows before
> the highest committed window since the block size may not have been reached
> in the committed callback for a given window.
>
> Fault tolerance : if the parquet writes to the file system fail then the
> operator will go down.In this case we will have to add a retry logic to
> read the files from WAL for the windows which failed.
>
> Please let me know if I am missing something in using the WAL and also if
> using a 2 operator solution would be better suited in this case.
>
> Thanks,
> Dev
>
>
> On Wed, Jun 15, 2016 at 5:02 PM, Thomas Weise <thomas@datatorrent.com>
> wrote:
>
> > Hi Dev,
> >
> > Can you not use the existing WAL implementation (via WindowDataManager or
> > directly)?
> >
> > Thomas
> >
> >
> > On Wed, Jun 15, 2016 at 3:47 PM, Devendra Tagare <
> > devendrat@datatorrent.com>
> > wrote:
> >
> > > Hi,
> > >
> > > Initial thoughts were to go for a WAL based approach where the operator
> > > would first write POJO's to the WAL and then a separate thread would do
> > the
> > > task of reading from the WAL and writing the destination files based on
> > the
> > > block size.
> > >
> > > There is a ticket open for a pluggable spooling implementation with
> > output
> > > operators which can be leveraged for this,
> > > https://issues.apache.org/jira/browse/APEXMALHAR-2037
> > >
> > > Since work is already being done on that front, we can plug in the
> > spooler
> > > with the existing implementation of the ParquetFileWriter at that point
> > and
> > > remove the first operator - ParquetFileOutputOperator.
> > >
> > > Thanks,
> > > Dev
> > >
> > > On Tue, Jun 14, 2016 at 7:21 PM, Thomas Weise <thomas@datatorrent.com>
> > > wrote:
> > >
> > > > What's the reason for not considering the WAL based approach?
> > > >
> > > > What are the pros and cons?
> > > >
> > > >
> > > > On Tue, Jun 14, 2016 at 6:54 PM, Devendra Tagare <
> > > > devendrat@datatorrent.com>
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > We can focus on the below 2 problems,
> > > > > 1.Avoid the small files problem which could arise due a flush at
> > every
> > > > > endWindow, since there wouldn't be significant data in a window.
> > > > > 2.Fault Tolerance.
> > > > >
> > > > > *Proposal* : Create a module in which there are 2 operators,
> > > > >
> > > > > *Operator 1 : ParquetFileOutputOperator*
> > > > > This operator will be an implementation of the
> > > > AbstractFileOutputOperator.
> > > > > It will write data to a HDFS location and leverage the
> > fault-tolerance
> > > > > semantics of the AbstractFileOutputOperator.
> > > > >
> > > > > This operator will implement the CheckpointNotificationListener and
> > > will
> > > > > emit the finalizedFiles from the beforeCheckpoint method.
> > > > > Map<windowId,Set<files finalized in the window>>
> > > > >
> > > > > *Operator 2 : ParquetFileWriter*
> > > > > This operator will receive a Set<files finalized in the window>
> from
> > > the
> > > > > ParquetFileOutputOperator on its input port.
> > > > > Once it receives this map, it will do the below things,
> > > > >
> > > > > 1.Save the input received to a Map<windowId,Set<InputFiles>>
> > > > inputFilesMap
> > > > >
> > > > > 2.Instantiate a new ParquetWriter
> > > > >   2.a. Get a unique file name.
> > > > >   2.b. Add a configurable writer that extends the ParquetWriter and
> > > > include
> > > > > a write support for writing various supported formats like
> > Avro,thrift
> > > > etc.
> > > > >
> > > > > 3.For each file from the inputFilesMap,
> > > > >   3.a Read the file and write the record using the writer created
> in
> > > (2)
> > > > >   3.b Check if the block size (configurable) is reached.If yes then
> > > close
> > > > > the file and add its entry to a
> > > > > Map<windowId,CompletedFiles>completedFilesMap.Remove the entry
from
> > > > > inputFilesMap.
> > > > >         If the writes fail then the files can be reprocessed from
> the
> > > > > inputFilesMap.
> > > > > 3.c In the committed callback remove the completed files from the
> > > > directory
> > > > > and prune the completedFilesMap for that window.
> > > > >
> > > > > Points to note,
> > > > > 1.The block size check will be approximate since the data is in
> > memory
> > > > and
> > > > > ParquetWriter does not expose a flush.
> > > > > 2.This is at best a temporary implementation in the absence of a
> WAL
> > > > based
> > > > > approach.
> > > > >
> > > > > I would like to take a crack at this operator based on community
> > > > feedback.
> > > > >
> > > > > Thoughts ?
> > > > >
> > > > > Thanks,
> > > > > Dev
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Apr 25, 2016 at 12:36 PM, Tushar Gosavi <
> > > tushar@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Shubham,
> > > > > >
> > > > > > +1 for the Parquet  writer.
> > > > > >
> > > > > > I doubt if we could leverage on recovery mechanism provided
by
> > > > > > AbstractFileOutputOperator as Parquet Writer does not expose
> flush,
> > > and
> > > > > > could write to underline stream at any time. To simplify recovery
> > you
> > > > can
> > > > > > write a single file in each checkpoint duration. If this is
not
> an
> > > > > option,
> > > > > > then
> > > > > > you need to make use of WAL for recovery, and not use operator
> > > > > > check-pointing for storing not persisted tuples, as checkpointing
> > > huge
> > > > > > state every 30 seconds is costly.
> > > > > >
> > > > > > Regards,
> > > > > > -Tushar.
> > > > > >
> > > > > >
> > > > > > On Mon, Apr 25, 2016 at 11:38 PM, Shubham Pathak <
> > > > > shubham@datatorrent.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hello Community,
> > > > > > >
> > > > > > > Apache Parquet <
> https://parquet.apache.org/documentation/latest/
> > >
> > > > is a
> > > > > > > columnar oriented binary file format designed to be extremely
> > > > efficient
> > > > > > and
> > > > > > > interoperable across Hadoop ecosystem. It has integrations
with
> > > most
> > > > of
> > > > > > the
> > > > > > > Hadoop processing frameworks ( Impala, Hive, Pig, Spark..
) and
> > > > > > > serialization models (Thrift, Avro, Protobuf)  making it
easy
> to
> > > use
> > > > in
> > > > > > ETL
> > > > > > > and processing pipelines.
> > > > > > >
> > > > > > > Having an operator to write data to Parquet files would
> certainly
> > > be
> > > > a
> > > > > > good
> > > > > > > addition to the Malhar library.
> > > > > > >
> > > > > > > The underlying implementation
> > > > > > > <
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/
> > > > > > > >
> > > > > > > for writing data as Parquet, requires a subclass of
> > > > > > > parquet.hadoop.api.WriteSupport that knows how to take
an
> > in-memory
> > > > > > object
> > > > > > > and write Parquet primitives through
> > > parquet.io.api.RecordConsumer*.*
> > > > > > > Currently, there are several WriteSupport implementations,
> > > including
> > > > > > > ThriftWriteSupport,
> > > > > > > AvroWriteSupport, and ProtoWriteSupport.
> > > > > > > These WriteSupport implementations are then wrapped as
> > > ParquetWriter
> > > > > > > objects for writing.
> > > > > > >
> > > > > > > Parquet Writers do not expose a handle to the underlying
> stream.
> > In
> > > > > order
> > > > > > > to  write data to a Parquet file, all the records ( that
belong
> > to
> > > > > file )
> > > > > > > must be buffered in memory. These records are then compressed
> and
> > > > later
> > > > > > > flushed to the file.
> > > > > > >
> > > > > > > To start with, we could support following features in the
> > operator
> > > > > > >
> > > > > > >    - *Ability to provide a WriteSupport Implementation*
: The
> > user
> > > > > should
> > > > > > >    be able to use existing implementations of
> parquet.hadoop.api.
> > > > > > >    WriteSupport or provide his/her own implementation.
> > > > > > >    - *Ability to configure Page Size : *Refers to the amount
of
> > > > > > >    uncompressed data for a single column that is read before
it
> > is
> > > > > > > compressed
> > > > > > >    as a unit and buffered in memory to be written out as
a
> > “page”.
> > > > > > Default
> > > > > > >    value : 1MB
> > > > > > >    - *Ability to configure Parquet Block Size : *Refers
to the
> > > amount
> > > > > of
> > > > > > >    compressed data that should be buffered in memory before
a
> row
> > > > group
> > > > > > is
> > > > > > >    written out to disk. Larger block sizes require more
memory
> to
> > > > > buffer
> > > > > > > the
> > > > > > >    data; Recommended is 128 MB / 256 MB
> > > > > > >    - *Flushing files periodically* :Operator would have
to
> flush
> > > > files
> > > > > > >    periodically in a specified directory as per configured
> block
> > > > size .
> > > > > > > This
> > > > > > >    could be time-based / number of events based  / size
based
> > > > > > >
> > > > > > > To implement the operator, here's one approach  :
> > > > > > >
> > > > > > >    1. Extend existing AbstractFileOutputOperator
> > > > > > >    2.  Provide methods to add write support implementations.
> > > > > > >    3. In process method, hold the data in memory till we
reach
> a
> > > > > > configured
> > > > > > >    size and then flush  the contents to a file during
> > endWindow().
> > > > > > >
> > > > > > > Please send across your thoughts on this. I would also
like to
> > know
> > > > if
> > > > > we
> > > > > > > would be able to leverage recovery mechanisms provided
by
> > > > > > > AbstractFileOutputOperator using this approach?
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Shubham
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message