apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pramod Immaneni <pra...@datatorrent.com>
Subject Re: [malhar-users] Re: How to use FileSplitter to read huge file
Date Wed, 28 Oct 2015 18:46:25 GMT
Thanks Chandni

On Wed, Oct 28, 2015 at 11:31 AM, Chandni Singh <chandni@datatorrent.com>
wrote:

> Folks,
>
> You can comment on these pull requests:
> https://github.com/DataTorrent/docs/pull/2
> https://github.com/DataTorrent/docs/pull/3
>
> Chandni
>
>
> On Wed, Oct 28, 2015 at 11:16 AM, Chandni Singh <chandni@datatorrent.com>
> wrote:
>
> > HI All,
> >
> > We have created tutorials for FileSplitter and BlockReader here:
> >
> >
> https://github.com/DataTorrent/docs/blob/master/docs/operators/io/file_splitter.md
> >
> >
> https://github.com/DataTorrent/docs/blob/master/docs/operators/io/block_reader.md
> >
> > Please have a look. Any feedback is appreciated.
> >
> > Thanks,
> > Chandni
> >
> > On Wed, Sep 30, 2015 at 8:37 PM, Chandni Singh <chandni@datatorrent.com>
> > wrote:
> >
> >> Hi Vk,
> >>
> >> Please find  a CSV block reader here and let me know if you have
> >> questions. I have also added a test and it seems to be working fine.
> >>
> >>
> >>
> https://github.com/chandnisingh/Malhar/tree/examples/demos/examples/src/main/java/com/datatorrent/examples/reader
> >>
> >> Please note that the BlockReader api has changed from the one you have
> >> been using considerably.
> >>
> >> Thanks,
> >> Chandni
> >>
> >> On Wed, Sep 30, 2015 at 8:29 AM, Chandni Singh <chandni@datatorrent.com
> >
> >> wrote:
> >>
> >>> Hi vk,
> >>>
> >>> I think you don't need to override readBlock() in AbstractBlockReader.
> >>>
> >>> A simpler way to do this will be using ReadAheadLineReaderContext as
> the
> >>> readerContext and providing implementation of converting bytes to the
> CSV
> >>> bean.
> >>>
> >>> public class CSVBeanReader extends AbstractFSBlockReader<CSVBean>
> >>> {
> >>>   public CSVBeanReader()
> >>>   {
> >>>     this.readerContext = new
> ReaderContext.ReadAheadLineReaderContext<>();
> >>>   }
> >>>
> >>>   @Override protected CSVBean convertToRecord(byte[] bytes)
> >>>   {
> >>>     //TODO: convert bytes to bean
> >>>     return new CSVBean(bytes);
> >>>   }
> >>> }
> >>>
> >>> Are you using supercsv? I think there is a way to convert bytes to a
> CSV
> >>> Record using it and I may have that example somewhere which I will
> look up
> >>> and let you know.
> >>>
> >>> Chandni
> >>>
> >>>
> >>>
> >>> On Tue, Sep 29, 2015 at 2:06 PM, vk <venkateshkkothapalli@gmail.com>
> >>> wrote:
> >>>
> >>>>  Here is a detailed description of the problem.
> >>>>
> >>>>>
> >>>>> My file size : *7,590,177  bytes*
> >>>>>
> >>>>> FIle splitter block size config :
> >>>>>
> >>>>>
> >>>>>
> <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name>
> >>>>>  <value>*16806*</value>
> >>>>>
> >>>>>
> >>>>> *MyBlockReader Implementation:*
> >>>>>
> >>>>> @Override
> >>>>> protected void readBlock(BlockMetadata blockMetadata) throws
> >>>>> IOException {
> >>>>> readerContext.initialize(stream, blockMetadata, consecutiveBlock);
> >>>>> ReaderContext.Entity entity;
> >>>>> csvReader = new CsvBeanReader(new InputStreamReader(stream,
> >>>>> encoding),csvPreference);
> >>>>> while ((entity = readerContext.next()) != null) {
> >>>>>
> >>>>>
> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
> >>>>> Packages record = convertToRecord(entity.getRecord());
> >>>>> //if record is partial, ignore the record
> >>>>> if (record != null) {
> >>>>> counters.getCounter(ReaderCounterKeys.RECORDS).increment();
> >>>>> data.emit(record);
> >>>>> }
> >>>>> }
> >>>>> }
> >>>>>
> >>>>>
> >>>>> @Override
> >>>>> protected Packages convertToRecord(byte[] data) {
> >>>>> Packages bean = null;
> >>>>> try {
> >>>>> bean = csvReader.read(Packages.class,Packages.COLS);
> >>>>> } catch (IOException e) {
> >>>>> e.printStackTrace();
> >>>>> }
> >>>>> return bean;
> >>>>> }
> >>>>>
> >>>>>
> >>>>> Based on the above, when blocks are created a record might be split
> >>>>> into two different blocks. When reading the blocks and converting
> them to
> >>>>> beans, it has to set the offset values appropriately to merge the
> split
> >>>>> record into one and process it. It looks like this implementation
> >>>>> is already handled in the API when *readerContext.initialize(stream,
> >>>>> blockMetadata, consecutiveBlock)* is called, but when tried to
> >>>>> execute with the above snippet, the following error is thrown
> because of
> >>>>> the split record. Can you please suggest?
> >>>>>
> >>>>> *Exception:*
> >>>>>
> >>>>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR
> >>>>> engine.StreamingContainer run - Operator set
> >>>>>
> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff,
> >>>>> 0,
> >>>>>
> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]]
> >>>>> stopped running due to an exception.
> >>>>> *java.lang.IllegalArgumentException: the nameMapping array and the
> >>>>> number of columns read should be the same size (nameMapping length
=
> 24,
> >>>>> columns = 5)*
> >>>>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180)
> >>>>> at
> >>>>>
> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34)
> >>>>> at
> >>>>>
> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70)
> >>>>> at
> >>>>>
> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208)
> >>>>> at
> >>>>>
> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127)
> >>>>> at
> >>>>>
> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123)
> >>>>>
> >>>>>
> >>>> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote:
> >>>>>
> >>>>>
> >>>>> Routing to dev@apex
> >>>>>
> >>>>> Amol
> >>>>>
> >>>>>
> >>>>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <chir...@gmail.com>
wrote:
> >>>>>
> >>>>>> Hi Pramod,
> >>>>>>
> >>>>>> thanks for the reply, it is working..
> >>>>>>
> >>>>>> And i have one more query on it, How to decide the block size?
> >>>>>>
> >>>>>> as per my understanding the
> >>>>>>
> >>>>>> noofBlocks=filesize / blocksize
> >>>>>>
> >>>>>> By this some records may be split into two blocks, when converting
> >>>>>> the record we dont have the complete data in one block.
> >>>>>>
> >>>>>> how to handle this?
> >>>>>>
> >>>>>> thanks in adavance.
> >>>>>>
> >>>>>> Thanks -Chiru
> >>>>>>
> >>>>>>
> >>>>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote:
> >>>>>>>
> >>>>>>> Hi All,
> >>>>>>>
> >>>>>>> I would like to read a large file using filesplitter and
emit
> >>>>>>> tuples.So i have writtent the code like below.
> >>>>>>>
> >>>>>>>
> >>>>>>> public class Reader extends
> >>>>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> protected Data convertToRecord(byte[] data)  { ///
> >>>>>>> }
> >>>>>>>
> >>>>>>> }
> >>>>>>>
> >>>>>>>
> >>>>>>> In my application class  i have created the object for filesplitter
> >>>>>>> and Reader classes and connect through stream.
> >>>>>>>
> >>>>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput,
> >>>>>>> reader.blocksMetadataInput)
> >>>>>>>
> >>>>>>>
> >>>>>>> In properties file iam passing the directory path
> >>>>>>> like
> <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name>
> >>>>>>>
> >>>>>>> when i run the application iam getting the below error:
> >>>>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR
> >>>>>>> engine.StreamingContainer run - Abandoning deployment of
operator
> >>>>>>>
> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff,
> >>>>>>> 0,
> >>>>>>>
> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]]
> >>>>>>> due to setup failure.*
> >>>>>>> *java.lang.IllegalArgumentException: empty files*
> >>>>>>>
> >>>>>>>
> >>>>>>> Please suggest is my approach is correct or not?
> >>>>>>> how to read data using Filesplitter using malhar-library-3.1.0.jar
> ,
> >>>>>>> share any sample code line.
> >>>>>>>
> >>>>>>> thanks _Chiranjeevi
> >>>>>>>
> >>>>>> --
> >>>>>> You received this message because you are subscribed to the
Google
> >>>>>> Groups "Malhar" group.
> >>>>>> To unsubscribe from this group and stop receiving emails from
it,
> >>>>>> send an email to malhar-users...@googlegroups.com.
> >>>>>> To post to this group, send email to malhar...@googlegroups.com.
> >>>>>> Visit this group at http://groups.google.com/group/malhar-users.
> >>>>>> For more options, visit https://groups.google.com/d/optout.
> >>>>>>
> >>>>>
> >>>>>
> >>>
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message