Return-Path: X-Original-To: apmail-apex-dev-archive@minotaur.apache.org Delivered-To: apmail-apex-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 02F4718437 for ; Wed, 28 Oct 2015 18:47:10 +0000 (UTC) Received: (qmail 66640 invoked by uid 500); 28 Oct 2015 18:47:09 -0000 Delivered-To: apmail-apex-dev-archive@apex.apache.org Received: (qmail 66580 invoked by uid 500); 28 Oct 2015 18:47:09 -0000 Mailing-List: contact dev-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list dev@apex.incubator.apache.org Received: (qmail 66557 invoked by uid 99); 28 Oct 2015 18:47:09 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Oct 2015 18:47:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 29465C0FD1 for ; Wed, 28 Oct 2015 18:47:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.001 X-Spam-Level: *** X-Spam-Status: No, score=3.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=3, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=datatorrent_com.20150623.gappssmtp.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id vYXYy29qkXTQ for ; Wed, 28 Oct 2015 18:46:56 +0000 (UTC) Received: from mail-qg0-f50.google.com (mail-qg0-f50.google.com [209.85.192.50]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 04E3520604 for ; Wed, 28 Oct 2015 18:46:55 +0000 (UTC) Received: by qgem9 with SMTP id m9so14647279qge.1 for ; Wed, 28 Oct 2015 11:46:55 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=datatorrent_com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=wCDEtqXReQyA4NIym/3BSvyRjR1uc3qNgyOJplOkOPA=; b=Crq/rjhPF9EdldWQHRqRtJvMXaTWEdWoXAjrr+K1lJo/1sMkMyjJ25wJ91l67Ms0/A gKWFQChHdKphd4oVzxLVkgrIdJw4XlMWE+PQNDofLAdAbDegtQVM/CsPYnz4OEGxC909 FDrU/Q/N77BJmVcS7VG5ske2BuzN5dzXxa7oP/5grvwbYKDWnokUcXkbc2WiQUhfYJNT TikmODC24cw1RCG27AJlN3FzdfsJ6jhfMZb16zT6DvIpg3GAh6GMTSEc08EhDD2QkbqQ /lFxzu8jPfh2InyRZNDSz8XrYiOYumIkrsIC3PhRdNsS0OJ/TyIKlDoR/zhMFtNFgfdz 6pig== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-type; bh=wCDEtqXReQyA4NIym/3BSvyRjR1uc3qNgyOJplOkOPA=; b=WZv8V/b+Q2o7uCqPnmwDg3ZnbkhDWg5TyuTrPFB3IOKAoE5rwnvbdgJ3TALrq+S3Ke Pw6BHoFZNsCEAj/6hjzFakWyR6xQwNaFGLmfNLrPvWDk4TO+DN84SKWdNwkysIxLDktK Esdrm61/11k6qrgwHuS41+3GExvYoxSpOuvOXCGRPCnV7e2zDfuHdCQu/IW9gwDG8HO7 UWalkcH/L8uXk9rYGKOXFJOgqAmBP1toGBdSEQjUjNqzoAP3NzX7TEVEZgukVubiqk9K KajhlgcdTjzzLl/mOe5fHD8pMIHMOkjSe6zAfDNi+ulczsSDGTKsIOVbxVKtdB2xYVyL B55w== X-Gm-Message-State: ALoCoQmcquEJuWmgMoUo6/DoNpGKe6OJsDb9yim01OwLLHSDk2t1ptnK6Ly5/H7oetshNQ3vGpGN X-Received: by 10.141.28.136 with SMTP id f130mr62366237qhe.66.1446058014902; Wed, 28 Oct 2015 11:46:54 -0700 (PDT) MIME-Version: 1.0 Received: by 10.140.23.197 with HTTP; Wed, 28 Oct 2015 11:46:25 -0700 (PDT) In-Reply-To: References: From: Pramod Immaneni Date: Wed, 28 Oct 2015 11:46:25 -0700 Message-ID: Subject: Re: [malhar-users] Re: How to use FileSplitter to read huge file To: dev@apex.incubator.apache.org Content-Type: multipart/alternative; boundary=001a114238dc4da35405232e9e62 --001a114238dc4da35405232e9e62 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Thanks Chandni On Wed, Oct 28, 2015 at 11:31 AM, Chandni Singh wrote: > Folks, > > You can comment on these pull requests: > https://github.com/DataTorrent/docs/pull/2 > https://github.com/DataTorrent/docs/pull/3 > > Chandni > > > On Wed, Oct 28, 2015 at 11:16 AM, Chandni Singh > wrote: > > > HI All, > > > > We have created tutorials for FileSplitter and BlockReader here: > > > > > https://github.com/DataTorrent/docs/blob/master/docs/operators/io/file_sp= litter.md > > > > > https://github.com/DataTorrent/docs/blob/master/docs/operators/io/block_r= eader.md > > > > Please have a look. Any feedback is appreciated. > > > > Thanks, > > Chandni > > > > On Wed, Sep 30, 2015 at 8:37 PM, Chandni Singh > > wrote: > > > >> Hi Vk, > >> > >> Please find a CSV block reader here and let me know if you have > >> questions. I have also added a test and it seems to be working fine. > >> > >> > >> > https://github.com/chandnisingh/Malhar/tree/examples/demos/examples/src/m= ain/java/com/datatorrent/examples/reader > >> > >> Please note that the BlockReader api has changed from the one you have > >> been using considerably. > >> > >> Thanks, > >> Chandni > >> > >> On Wed, Sep 30, 2015 at 8:29 AM, Chandni Singh > > >> wrote: > >> > >>> Hi vk, > >>> > >>> I think you don't need to override readBlock() in AbstractBlockReader= . > >>> > >>> A simpler way to do this will be using ReadAheadLineReaderContext as > the > >>> readerContext and providing implementation of converting bytes to the > CSV > >>> bean. > >>> > >>> public class CSVBeanReader extends AbstractFSBlockReader > >>> { > >>> public CSVBeanReader() > >>> { > >>> this.readerContext =3D new > ReaderContext.ReadAheadLineReaderContext<>(); > >>> } > >>> > >>> @Override protected CSVBean convertToRecord(byte[] bytes) > >>> { > >>> //TODO: convert bytes to bean > >>> return new CSVBean(bytes); > >>> } > >>> } > >>> > >>> Are you using supercsv? I think there is a way to convert bytes to a > CSV > >>> Record using it and I may have that example somewhere which I will > look up > >>> and let you know. > >>> > >>> Chandni > >>> > >>> > >>> > >>> On Tue, Sep 29, 2015 at 2:06 PM, vk > >>> wrote: > >>> > >>>> Here is a detailed description of the problem. > >>>> > >>>>> > >>>>> My file size : *7,590,177 bytes* > >>>>> > >>>>> FIle splitter block size config : > >>>>> > >>>>> > >>>>> > dt.application.MyFirstApplication.operator.FileSplitter.prop.blockS= ize > >>>>> *16806* > >>>>> > >>>>> > >>>>> *MyBlockReader Implementation:* > >>>>> > >>>>> @Override > >>>>> protected void readBlock(BlockMetadata blockMetadata) throws > >>>>> IOException { > >>>>> readerContext.initialize(stream, blockMetadata, consecutiveBlock); > >>>>> ReaderContext.Entity entity; > >>>>> csvReader =3D new CsvBeanReader(new InputStreamReader(stream, > >>>>> encoding),csvPreference); > >>>>> while ((entity =3D readerContext.next()) !=3D null) { > >>>>> > >>>>> > counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes()); > >>>>> Packages record =3D convertToRecord(entity.getRecord()); > >>>>> //if record is partial, ignore the record > >>>>> if (record !=3D null) { > >>>>> counters.getCounter(ReaderCounterKeys.RECORDS).increment(); > >>>>> data.emit(record); > >>>>> } > >>>>> } > >>>>> } > >>>>> > >>>>> > >>>>> @Override > >>>>> protected Packages convertToRecord(byte[] data) { > >>>>> Packages bean =3D null; > >>>>> try { > >>>>> bean =3D csvReader.read(Packages.class,Packages.COLS); > >>>>> } catch (IOException e) { > >>>>> e.printStackTrace(); > >>>>> } > >>>>> return bean; > >>>>> } > >>>>> > >>>>> > >>>>> Based on the above, when blocks are created a record might be split > >>>>> into two different blocks. When reading the blocks and converting > them to > >>>>> beans, it has to set the offset values appropriately to merge the > split > >>>>> record into one and process it. It looks like this implementation > >>>>> is already handled in the API when *readerContext.initialize(stream= , > >>>>> blockMetadata, consecutiveBlock)* is called, but when tried to > >>>>> execute with the above snippet, the following error is thrown > because of > >>>>> the split record. Can you please suggest? > >>>>> > >>>>> *Exception:* > >>>>> > >>>>> 2015-09-29 12:46:40,384 [2/reader:MyBlockReader] ERROR > >>>>> engine.StreamingContainer run - Operator set > >>>>> > [OperatorDeployInfo[id=3D2,name=3Dreader,type=3DGENERIC,checkpoint=3D{fff= fffffffffffff, > >>>>> 0, > >>>>> > 0},inputs=3D[OperatorDeployInfo.InputDeployInfo[portName=3DblocksMetadata= Input,streamId=3Dblockin,sourceNodeId=3D1,sourcePortName=3DblocksMetadataOu= tput,locality=3DCONTAINER_LOCAL,partitionMask=3D0,partitionKeys=3D]],= outputs=3D[OperatorDeployInfo.OutputDeployInfo[portName=3Ddata,streamId=3Dr= andomData,bufferServer=3D]]]] > >>>>> stopped running due to an exception. > >>>>> *java.lang.IllegalArgumentException: the nameMapping array and the > >>>>> number of columns read should be the same size (nameMapping length = =3D > 24, > >>>>> columns =3D 5)* > >>>>> at org.supercsv.io.CsvBeanReader.read(CsvBeanReader.java:180) > >>>>> at > >>>>> > com.directv.sms.filesplitterimpl.MyBlockReader.convertToRecord(CsvBlockRe= ader.java:34) > >>>>> at > >>>>> > com.directv.sms.filesplitterimpl.MyBlockReader.readBlock(CsvBlockReader.j= ava:70) > >>>>> at > >>>>> > com.datatorrent.lib.io.block.AbstractBlockReader.processBlockMetadata(Abs= tractBlockReader.java:208) > >>>>> at > >>>>> > com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockR= eader.java:127) > >>>>> at > >>>>> > com.datatorrent.lib.io.block.AbstractBlockReader$1.process(AbstractBlockR= eader.java:123) > >>>>> > >>>>> > >>>> On Monday, 28 September 2015 11:10:46 UTC-7, Amol Kekre wrote: > >>>>> > >>>>> > >>>>> Routing to dev@apex > >>>>> > >>>>> Amol > >>>>> > >>>>> > >>>>> On Mon, Sep 28, 2015 at 1:58 AM, Chiru wrote: > >>>>> > >>>>>> Hi Pramod, > >>>>>> > >>>>>> thanks for the reply, it is working.. > >>>>>> > >>>>>> And i have one more query on it, How to decide the block size? > >>>>>> > >>>>>> as per my understanding the > >>>>>> > >>>>>> noofBlocks=3Dfilesize / blocksize > >>>>>> > >>>>>> By this some records may be split into two blocks, when converting > >>>>>> the record we dont have the complete data in one block. > >>>>>> > >>>>>> how to handle this? > >>>>>> > >>>>>> thanks in adavance. > >>>>>> > >>>>>> Thanks -Chiru > >>>>>> > >>>>>> > >>>>>> On Thursday, 24 September 2015 12:45:07 UTC+5:30, Chiru wrote: > >>>>>>> > >>>>>>> Hi All, > >>>>>>> > >>>>>>> I would like to read a large file using filesplitter and emit > >>>>>>> tuples.So i have writtent the code like below. > >>>>>>> > >>>>>>> > >>>>>>> public class Reader extends > >>>>>>> AbstractFSBlockReader.AbstractFSReadAheadLineReader{ > >>>>>>> > >>>>>>> @Override > >>>>>>> protected Data convertToRecord(byte[] data) { /// > >>>>>>> } > >>>>>>> > >>>>>>> } > >>>>>>> > >>>>>>> > >>>>>>> In my application class i have created the object for filesplitt= er > >>>>>>> and Reader classes and connect through stream. > >>>>>>> > >>>>>>> dag.addStream("blockin", fileSplitter.blocksMetadataOutput, > >>>>>>> reader.blocksMetadataInput) > >>>>>>> > >>>>>>> > >>>>>>> In properties file iam passing the directory path > >>>>>>> like > dt.application.MyFirstApplication.operator.fileSplitter.prop.direct= oryPath > >>>>>>> > >>>>>>> when i run the application iam getting the below error: > >>>>>>> *2015-09-24 11:40:03,040 [1/FileSplitter:FileSplitter] ERROR > >>>>>>> engine.StreamingContainer run - Abandoning deployment of operator > >>>>>>> > OperatorDeployInfo[id=3D1,name=3DFileSplitter,type=3DINPUT,checkpoint=3D{= ffffffffffffffff, > >>>>>>> 0, > >>>>>>> > 0},inputs=3D[],outputs=3D[OperatorDeployInfo.OutputDeployInfo[portName=3D= blocksMetadataOutput,streamId=3Dblockin,bufferServer=3D]]] > >>>>>>> due to setup failure.* > >>>>>>> *java.lang.IllegalArgumentException: empty files* > >>>>>>> > >>>>>>> > >>>>>>> Please suggest is my approach is correct or not? > >>>>>>> how to read data using Filesplitter using malhar-library-3.1.0.ja= r > , > >>>>>>> share any sample code line. > >>>>>>> > >>>>>>> thanks _Chiranjeevi > >>>>>>> > >>>>>> -- > >>>>>> You received this message because you are subscribed to the Google > >>>>>> Groups "Malhar" group. > >>>>>> To unsubscribe from this group and stop receiving emails from it, > >>>>>> send an email to malhar-users...@googlegroups.com. > >>>>>> To post to this group, send email to malhar...@googlegroups.com. > >>>>>> Visit this group at http://groups.google.com/group/malhar-users. > >>>>>> For more options, visit https://groups.google.com/d/optout. > >>>>>> > >>>>> > >>>>> > >>> > >> > > > --001a114238dc4da35405232e9e62--