Return-Path: X-Original-To: apmail-flume-user-archive@www.apache.org Delivered-To: apmail-flume-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 48C03C75B for ; Fri, 19 Jul 2013 16:23:18 +0000 (UTC) Received: (qmail 18902 invoked by uid 500); 19 Jul 2013 16:23:17 -0000 Delivered-To: apmail-flume-user-archive@flume.apache.org Received: (qmail 18716 invoked by uid 500); 19 Jul 2013 16:23:17 -0000 Mailing-List: contact user-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flume.apache.org Delivered-To: mailing list user@flume.apache.org Received: (qmail 18702 invoked by uid 99); 19 Jul 2013 16:23:17 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Jul 2013 16:23:17 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW X-Spam-Check-By: apache.org Received-SPF: unknown mxinclude:spf.messaging.microsoft.cominclude:amazonses.comip4:176.31.239.88ip4:176.31.239.90ip4:176.31.237.104ip4:176.31.237.103~all (athena.apache.org: encountered unrecognized mechanism during SPF processing of domain of Peyman.Mohajerian@fox.com) Received: from [216.32.181.186] (HELO ch1outboundpool.messaging.microsoft.com) (216.32.181.186) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Jul 2013 16:23:12 +0000 Received: from mail98-ch1-R.bigfish.com (10.43.68.250) by CH1EHSOBE005.bigfish.com (10.43.70.55) with Microsoft SMTP Server id 14.1.225.22; Fri, 19 Jul 2013 16:22:51 +0000 Received: from mail98-ch1 (localhost [127.0.0.1]) by mail98-ch1-R.bigfish.com (Postfix) with ESMTP id 7A7282A0172 for ; Fri, 19 Jul 2013 16:22:51 +0000 (UTC) X-Forefront-Antispam-Report: CIP:216.205.229.107;KIP:(null);UIP:(null);IPV:NLI;H:FEGPLMSMMDZ04.foxinc.com;RD:fegplmsmmdz04.foxinc.com;EFVD:NLI X-SpamScore: -1 X-BigFish: VPS-1(zz98dI9371Ic85fh1dbaId799h4015I853kzz1f42h208ch1ee6h1de0h1fdah2073h1202h1e76h1d1ah1d2ah1fc6hzz1d7338h1de098h17326ah18c673h1de097h1de096h1954cbh8275bh8275dhz2dh2a8h668h839h8e3hd25hf0ah1288h12a5h12bdh137ah1441h1504h1537h153bh15d0h162dh1631h1758h18e1h1946h19b5h1b0ah1bceh1d0ch1d2eh1d3fh1dfeh1dffh1e1dh1155h) Received: from mail98-ch1 (localhost.localdomain [127.0.0.1]) by mail98-ch1 (MessageSwitch) id 1374250969449611_2967; Fri, 19 Jul 2013 16:22:49 +0000 (UTC) Received: from CH1EHSMHS013.bigfish.com (snatpool1.int.messaging.microsoft.com [10.43.68.242]) by mail98-ch1.bigfish.com (Postfix) with ESMTP id 5E09540031 for ; Fri, 19 Jul 2013 16:22:49 +0000 (UTC) Received: from FEGPLMSMMDZ04.foxinc.com (216.205.229.107) by CH1EHSMHS013.bigfish.com (10.43.70.13) with Microsoft SMTP Server (TLS) id 14.16.227.3; Fri, 19 Jul 2013 16:22:44 +0000 Received: from FEGCNHT04.ffe.foxeg.com (Not Verified[10.136.109.253]) by FOX.COM (post.office MTA v5.0 0924 ) with ESMTP id ; Fri, 19 Jul 2013 09:22:32 -0700 Received: from FEGCNCAS01.ffe.foxeg.com (10.137.90.14) by FEGCNHT04.ffe.foxeg.com (10.137.90.30) with Microsoft SMTP Server (TLS) id 14.3.123.3; Fri, 19 Jul 2013 09:20:30 -0700 Received: from FEGCNMB09.ffe.foxeg.com ([fe80::cd76:401a:8c21:5271]) by FEGCNCAS01.ffe.foxeg.com ([::1]) with mapi id 14.03.0123.003; Fri, 19 Jul 2013 09:20:30 -0700 From: Peyman Mohajerian To: "user@flume.apache.org" Subject: RE: HDF Sink Additional Bytes added for File Events Thread-Topic: HDF Sink Additional Bytes added for File Events Thread-Index: Ac5/WIyI0PICnj2XTkmL4mK/1C6lBgFBdncAAA89TGA= Date: Fri, 19 Jul 2013 16:20:30 +0000 Message-ID: References: In-Reply-To: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-originating-ip: [10.135.6.117] Content-Type: multipart/alternative; boundary="_000_D87956378FD77B4EA3CE1490C94C473FCA078FFEGCNMB09ffefoxeg_" MIME-Version: 1.0 X-DLP: UT52 X-OriginatorOrg: fox.com X-FOPE-CONNECTOR: Id%0$Dn%*$RO%0$TLS%0$FQDN%$TlsDn% X-Virus-Checked: Checked by ClamAV on apache.org --_000_D87956378FD77B4EA3CE1490C94C473FCA078FFEGCNMB09ffefoxeg_ Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable Israel, They are text files and I did exactly as you said, I unzipped the file use = 'GZIPInputStream' and created an event for the full content of the file and= compressed it as part of the sink. This work around is fine but there mayb= e cases that you are forced to keep the original format and you just want t= o channel the data and not unzip it and avoid the extra headers. Thanks Peyan From: Israel Ekpo [mailto:israel@aicer.org] Sent: Thursday, July 18, 2013 7:00 PM To: user@flume.apache.org Subject: Re: HDF Sink Additional Bytes added for File Events Peyman, I would like to understand the original types for the gzip files your custo= m source is consuming. Are these binary files or text files before they are= compressed? Is the entire file a single event, or does it contain delimiters that mark = where one event ends and another one starts? You may be able to get around consuming the gzip files by decompressing it = first before reading it. This way, the uncompressed bytes are not corrupted if additional data gets = appended to the event body or headers. Here are some tools that could help: http://docs.oracle.com/javase/6/docs/api/java/util/zip/GZIPInputStream.html http://docs.oracle.com/javase/6/docs/api/java/util/zip/ZipInputStream.html http://commons.apache.org/proper/commons-compress/apidocs/org/apache/common= s/compress/compressors/gzip/package-summary.html What do you think about this direction? Author and Instructor for the Upcoming Book and Lecture Series Massive Log Data Aggregation, Processing, Searching and Visualization with = Open Source Software http://massivelogdata.com On 12 July 2013 19:36, Peyman Mohajerian > wrote: Hi Guys, I have a custom source and consuming whole 'gz' files as byte arrays and ea= ch file is a single event. I'd like to write the file to HDFS. During the w= rite some additional bytes are added and therefore file is corrupted, not a= ble to unzip it any more. I know this is not a good usecase for Flume but I= 'd like to keep a consistent data collection design and was hoping I could = pass full gz files to HDFS without the file being corrupted. Either the 'ti= mestamp' header is causing issue or the 'text' file format, but I'm not sur= e. Any solution? Thanks, Peyman XXX.sources =3D xxx XXX.channels =3D MemChannel XXX.sinks =3D HDFS XXX.sources.xxx.type =3D com.xxx.xxx.xxx.Source XXX.sources.xxx.channels =3D MemChannel XXX.sinks.HDFS.channel =3D MemChannel XXX.sinks.HDFS.type =3D hdfs XXX.sinks.HDFS.hdfs.path =3D hdfs://xxxx/user/xxx/xxx/gzfiles/%Y/%m/%d/ XXX.sinks.HDFS.hdfs.fileType =3D DataStream XXX.sinks.HDFS.hdfs.filePrefix =3D xxxx XXX.sinks.HDFS.hdfs.batchSize =3D 1 XXX.sinks.HDFS.hdfs.rollSize =3D 0 XXX.sinks.HDFS.hdfs.idleTimeout =3D 3 XXX.sinks.HDFS.hdfs.rollInterval =3D 0 XXX.sinks.HDFS.hdfs.rollCount =3D 1 XXX.channels.MemChannel.type =3D memory XXX.channels.MemChannel.capacity =3D 1 XXX.channels.MemChannel.transactionCapacity =3D 1 XXX.channels.MemChannel.byteCapacityBufferPercentag =3D 100 InputStream in =3D Toolbox.inputStreamUrlConnection(url, account.getAuth1()= , account.getAuth2()); outputStream =3D new ByteArrayOutputStream(); byte[] buf =3D new byte[1024]; // optimize the size of buffer to your need int num; while ((num =3D in.read(buf)) !=3D -1) { outputStream.write(buf, 0, num); } headers.put("timestamp", String.valueOf(new Date().getTime())); Event e =3D EventBuilder.withBody(outputStream.toByteArray(), headers); getChannelProcessor().processEvent(e); --_000_D87956378FD77B4EA3CE1490C94C473FCA078FFEGCNMB09ffefoxeg_ Content-Type: text/html; charset="us-ascii" Content-Transfer-Encoding: quoted-printable

Israel,=

 <= /p>

They are text files and I= did exactly as you said, I unzipped the file use ‘GZIPInputStream= 217; and created an event for the full content of the file and compressed it as part of the sink. This work around is fine but there maybe cases tha= t you are forced to keep the original format and you just want to channel t= he data and not unzip it and avoid the extra headers.

 <= /p>

Thanks<= /p>

Peyan

 <= /p>

From: Israel E= kpo [mailto:israel@aicer.org]
Sent: Thursday, July 18, 2013 7:00 PM
To: user@flume.apache.org
Subject: Re: HDF Sink Additional Bytes added for File Events

 

Peyman,

 

I would like to understand the original types for th= e gzip files your custom source is consuming. Are these binary files or tex= t files before they are compressed?

 

Is the entire file a single event, or does it contai= n delimiters that mark where one event ends and another one starts?

 

You may be able to get around consuming the gzip fil= es by decompressing it first before reading it.

 

This way, the uncompressed bytes are not corrupted i= f additional data gets appended to the event body or headers.

 

Here are some tools that could help:

 

 

What do you think about this direction?

 

 

 

 


Author and Instructor for the Upcoming Book and L= ecture Series

Massive Log Data Aggregation, Processing, Searchi= ng and Visualization with Open Source Software

 

On 12 July 2013 19:36, Peyman Mohajerian <Peyman.Mohajerian@f= ox.com> wrote:

Hi Guys,<= /o:p>

 

I have a custom sourc= e and consuming whole ‘gz’ files as byte arrays and each file i= s a single event. I’d like to write the file to HDFS. During the writ= e some additional bytes are added and therefore file is corrupted, not able to unzip it any more. I know this is not a good use= case for Flume but I’d like to keep a consistent data collection desi= gn and was hoping I could pass full gz files to HDFS without the file being= corrupted. Either the ‘timestamp’ header is causing issue or the ‘text’ file format, but I’m not = sure. Any solution?

Thanks,

Peyman

 

XXX.sources =3D xx= x

XXX.channels =3D MemC= hannel

XXX.sinks =3D HDFS

 

XXX.sources.xxx.type = =3D com.xxx.xxx.xxx.Source

XXX.sources.xxx.chann= els =3D MemChannel

 

XXX.sinks.HDFS.channe= l =3D MemChannel

XXX.sinks.HDFS.type = =3D hdfs

XXX.sinks.HDFS.hdfs.p= ath =3D hdfs://xxxx/user/xxx/xxx/gzfiles/%Y/%m/%d/

XXX.sinks.HDFS.hdfs.f= ileType =3D DataStream

XXX.sinks.HDFS.hdfs.f= ilePrefix =3D xxxx

XXX.sinks.HDFS.hdfs.b= atchSize =3D 1

XXX.sinks.HDFS.hdfs.r= ollSize =3D 0

XXX.sinks.HDFS.hdfs.i= dleTimeout =3D 3

XXX.sinks.HDFS.hdfs.r= ollInterval =3D 0

XXX.sinks.HDFS.hdfs.r= ollCount =3D 1

 

XXX.channels.MemChann= el.type =3D memory

XXX.channels.MemChann= el.capacity =3D 1

XXX.channels.MemChann= el.transactionCapacity =3D 1

XXX.channels= .MemChannel.byteCapacityBufferPercentag =3D 100

 

 

InputStream in =3D To= olbox.inputStreamUrlConnection(url, account.getAuth1(), account.getAuth2());         = ;     

outputStream =3D <= span style=3D"color:#7F0055">new ByteArrayOutputStream();=

byte= [] buf =3D new byte[1024]; // optimize the size of buffer to your need

int<= /span> num;

whil= e ((num = =3D in.read(buf)) !=3D -1) {

   &nb= sp;   outputStream.write(buf, 0, num);

}

headers.put("timestamp", String.valueOf(new Date().getTime()));

Event e =3D EventBuil= der.withBody(outputStream.toByteArray(), headers);=

getChannelPr= ocessor().processEvent(e);

 

 

 

--_000_D87956378FD77B4EA3CE1490C94C473FCA078FFEGCNMB09ffefoxeg_--