apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Devendra Tagare <devend...@datatorrent.com>
Subject Re: Parquet Writer Operator
Date Wed, 15 Jun 2016 01:54:13 GMT
Hi All,

We can focus on the below 2 problems,
1.Avoid the small files problem which could arise due a flush at every
endWindow, since there wouldn't be significant data in a window.
2.Fault Tolerance.

*Proposal* : Create a module in which there are 2 operators,

*Operator 1 : ParquetFileOutputOperator*
This operator will be an implementation of the AbstractFileOutputOperator.
It will write data to a HDFS location and leverage the fault-tolerance
semantics of the AbstractFileOutputOperator.

This operator will implement the CheckpointNotificationListener and will
emit the finalizedFiles from the beforeCheckpoint method.
Map<windowId,Set<files finalized in the window>>

*Operator 2 : ParquetFileWriter*
This operator will receive a Set<files finalized in the window> from the
ParquetFileOutputOperator on its input port.
Once it receives this map, it will do the below things,

1.Save the input received to a Map<windowId,Set<InputFiles>> inputFilesMap

2.Instantiate a new ParquetWriter
  2.a. Get a unique file name.
  2.b. Add a configurable writer that extends the ParquetWriter and include
a write support for writing various supported formats like Avro,thrift etc.

3.For each file from the inputFilesMap,
  3.a Read the file and write the record using the writer created in (2)
  3.b Check if the block size (configurable) is reached.If yes then close
the file and add its entry to a
Map<windowId,CompletedFiles>completedFilesMap.Remove the entry from
inputFilesMap.
        If the writes fail then the files can be reprocessed from the
inputFilesMap.
3.c In the committed callback remove the completed files from the directory
and prune the completedFilesMap for that window.

Points to note,
1.The block size check will be approximate since the data is in memory and
ParquetWriter does not expose a flush.
2.This is at best a temporary implementation in the absence of a WAL based
approach.

I would like to take a crack at this operator based on community feedback.

Thoughts ?

Thanks,
Dev












On Mon, Apr 25, 2016 at 12:36 PM, Tushar Gosavi <tushar@datatorrent.com>
wrote:

> Hi Shubham,
>
> +1 for the Parquet  writer.
>
> I doubt if we could leverage on recovery mechanism provided by
> AbstractFileOutputOperator as Parquet Writer does not expose flush, and
> could write to underline stream at any time. To simplify recovery you can
> write a single file in each checkpoint duration. If this is not an option,
> then
> you need to make use of WAL for recovery, and not use operator
> check-pointing for storing not persisted tuples, as checkpointing huge
> state every 30 seconds is costly.
>
> Regards,
> -Tushar.
>
>
> On Mon, Apr 25, 2016 at 11:38 PM, Shubham Pathak <shubham@datatorrent.com>
> wrote:
>
> > Hello Community,
> >
> > Apache Parquet <https://parquet.apache.org/documentation/latest/> is a
> > columnar oriented binary file format designed to be extremely efficient
> and
> > interoperable across Hadoop ecosystem. It has integrations with most of
> the
> > Hadoop processing frameworks ( Impala, Hive, Pig, Spark.. ) and
> > serialization models (Thrift, Avro, Protobuf)  making it easy to use in
> ETL
> > and processing pipelines.
> >
> > Having an operator to write data to Parquet files would certainly be a
> good
> > addition to the Malhar library.
> >
> > The underlying implementation
> > <
> >
> http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/
> > >
> > for writing data as Parquet, requires a subclass of
> > parquet.hadoop.api.WriteSupport that knows how to take an in-memory
> object
> > and write Parquet primitives through parquet.io.api.RecordConsumer*.*
> > Currently, there are several WriteSupport implementations, including
> > ThriftWriteSupport,
> > AvroWriteSupport, and ProtoWriteSupport.
> > These WriteSupport implementations are then wrapped as ParquetWriter
> > objects for writing.
> >
> > Parquet Writers do not expose a handle to the underlying stream. In order
> > to  write data to a Parquet file, all the records ( that belong to file )
> > must be buffered in memory. These records are then compressed and later
> > flushed to the file.
> >
> > To start with, we could support following features in the operator
> >
> >    - *Ability to provide a WriteSupport Implementation* : The user should
> >    be able to use existing implementations of parquet.hadoop.api.
> >    WriteSupport or provide his/her own implementation.
> >    - *Ability to configure Page Size : *Refers to the amount of
> >    uncompressed data for a single column that is read before it is
> > compressed
> >    as a unit and buffered in memory to be written out as a “page”.
> Default
> >    value : 1MB
> >    - *Ability to configure Parquet Block Size : *Refers to the amount of
> >    compressed data that should be buffered in memory before a row group
> is
> >    written out to disk. Larger block sizes require more memory to buffer
> > the
> >    data; Recommended is 128 MB / 256 MB
> >    - *Flushing files periodically* :Operator would have to flush files
> >    periodically in a specified directory as per configured block size .
> > This
> >    could be time-based / number of events based  / size based
> >
> > To implement the operator, here's one approach  :
> >
> >    1. Extend existing AbstractFileOutputOperator
> >    2.  Provide methods to add write support implementations.
> >    3. In process method, hold the data in memory till we reach a
> configured
> >    size and then flush  the contents to a file during endWindow().
> >
> > Please send across your thoughts on this. I would also like to know if we
> > would be able to leverage recovery mechanisms provided by
> > AbstractFileOutputOperator using this approach?
> >
> >
> > Thanks,
> > Shubham
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message