apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chandni Singh <chan...@datatorrent.com>
Subject Re: [malhar-users] Re: How to use FileSplitter to read huge file
Date Thu, 01 Oct 2015 03:37:07 GMT
Hi Vk,

Please find  a CSV block reader here and let me know if you have questions.
I have also added a test and it seems to be working fine.

https://github.com/chandnisingh/Malhar/tree/examples/demos/examples/src/main/java/com/datatorrent/examples/reader

Please note that the BlockReader api has changed from the one you have been
using considerably.

Thanks,
Chandni

On Wed, Sep 30, 2015 at 8:29 AM, Chandni Singh <chandni@datatorrent.com>
wrote:

> Hi vk,
>
> I think you don't need to override readBlock() in AbstractBlockReader.
>
> A simpler way to do this will be using ReadAheadLineReaderContext as the
> readerContext and providing implementation of converting bytes to the CSV
> bean.
>
> public class CSVBeanReader extends AbstractFSBlockReader<CSVBean>
> {
>   public CSVBeanReader()
>   {
>     this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>();
>   }
>
>   @Override protected CSVBean convertToRecord(byte[] bytes)
>   {
>     //TODO: convert bytes to bean
>     return new CSVBean(bytes);
>   }
> }
>
> Are you using supercsv? I think there is a way to convert bytes to a CSV
> Record using it and I may have that example somewhere which I will look up
> and let you know.
>
> Chandni
>
>
>
> On Tue, Sep 29, 2015 at 2:06 PM, vk <venkateshkkothapalli@gmail.com>
> wrote:
>
>>  Here is a detailed description of the problem.
>>
>>>
>>> My file size : *7,590,177  bytes*
>>>
>>> FIle splitter block size config :
>>>
>>>
>>>  <name>dt.application.MyFirstApplication.operator.FileSplitter.prop.blockSize</name>
>>>  <value>*16806*</value>
>>>
>>>
>>> *MyBlockReader Implementation:*
>>>
>>> @Override
>>> protected void readBlock(BlockMetadata blockMetadata) throws IOException
>>> {
>>> readerContext.initialize(stream, blockMetadata, consecutiveBlock);
>>> ReaderContext.Entity entity;
>>> csvReader = new CsvBeanReader(new InputStreamReader(stream,
>>> encoding),csvPreference);
>>> while ((entity = readerContext.next()) != null) {
>>> counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
>>> Packages record = convertToRecord(entity.getRecord());
>>> //if record is partial, ignore the record
>>> if (record != null) {
>>> counters.getCounter(ReaderCounterKeys.RECORDS).increment();
>>> data.emit(record);
>>> }
>>> }
>>> }
>>>
>>>
>>> @Override
>>> protected Packages convertToRecord(byte[] data) {
>>> Packages bean = null;
>>> try {
>>> bean = csvReader.read(Packages.class,Packages.COLS);
>>> } catch (IOException e) {
>>> e.printStackTrace();
>>> }
>>> return bean;
>>> }
>>>
>>>
>>> Based on the above, when blocks are created a record might be split into
>>> two different blocks. When reading the blocks and converting them to beans,
>>> it has to set the offset values appropriately to merge the split record
>>> into one and process it. It looks like this implementation
>>> is already handled in the API when *readerContext.initialize(stream,
>>> blockMetadata, consecutiveBlock)* is called, but when tried to execute
>>> with the above snippet, the following error is thrown because of the split
>>> record. Can you please suggest?
>>>
>>> *Exception:*
>>>
>>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR
>>> engine.StreamingContainer run - Operator set
>>> [OperatorDeployInfo[id=2,name=reader,type=GENERIC,checkpoint={ffffffffffffffff,
>>> 0,
>>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=blocksMetadataInput,streamId=blockin,sourceNodeId=1,sourcePortName=blocksMetadataOutput,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=data,streamId=randomData,bufferServer=<null>]]]]
>>> stopped running due to an exception.
>>> *java.lang.IllegalArgumentException: the nameMapping array and the
>>> number of columns read should be the same size (nameMapping length = 24,
>>> columns = 5)*
>>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180)
>>> at
>>> com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockReader.java:34)
>>> at
>>> com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.java:70)
>>> at
>>> com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(AbstractBlockReader.java:208)
>>> at
>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:127)
>>> at
>>> com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockReader.java:123)
>>>
>>>
>> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote:
>>>
>>>
>>> Routing to dev@apex
>>>
>>> Amol
>>>
>>>
>>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru <chir...@gmail.com> wrote:
>>>
>>>> Hi Pramod,
>>>>
>>>> thanks for the reply, it is working..
>>>>
>>>> And i have one more query on it, How to decide the block size?
>>>>
>>>> as per my understanding the
>>>>
>>>> noofBlocks=filesize / blocksize
>>>>
>>>> By this some records may be split into two blocks, when converting the
>>>> record we dont have the complete data in one block.
>>>>
>>>> how to handle this?
>>>>
>>>> thanks in adavance.
>>>>
>>>> Thanks -Chiru
>>>>
>>>>
>>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote:
>>>>>
>>>>> Hi All,
>>>>>
>>>>> I would like to read a large file using filesplitter and emit
>>>>> tuples.So i have writtent the code like below.
>>>>>
>>>>>
>>>>> public class Reader extends
>>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader<Data>{
>>>>>
>>>>> @Override
>>>>> protected Data convertToRecord(byte[] data)  { ///
>>>>> }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> In my application class  i have created the object for filesplitter
>>>>> and Reader classes and connect through stream.
>>>>>
>>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput,
>>>>> reader.blocksMetadataInput)
>>>>>
>>>>>
>>>>> In properties file iam passing the directory path
>>>>> like <name>dt.application.MyFirstApplication.operator.fileSplitter.prop.directoryPath</name>
>>>>>
>>>>> when i run the application iam getting the below error:
>>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR
>>>>> engine.StreamingContainer run - Abandoning deployment of operator
>>>>> OperatorDeployInfo[id=1,name=FileSplitter,type=INPUT,checkpoint={ffffffffffffffff,
>>>>> 0,
>>>>> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=blocksMetadataOutput,streamId=blockin,bufferServer=<null>]]]
>>>>> due to setup failure.*
>>>>> *java.lang.IllegalArgumentException: empty files*
>>>>>
>>>>>
>>>>> Please suggest is my approach is correct or not?
>>>>> how to read data using Filesplitter using malhar-library-3.1.0.jar ,
>>>>> share any sample code line.
>>>>>
>>>>> thanks _Chiranjeevi
>>>>>
>>>> --
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Malhar" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to malhar-users...@googlegroups.com.
>>>> To post to this group, send email to malhar...@googlegroups.com.
>>>> Visit this group at http://groups.google.com/group/malhar-users.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message