Return-Path: X-Original-To: apmail-incubator-crunch-user-archive@minotaur.apache.org Delivered-To: apmail-incubator-crunch-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 40195E21B for ; Tue, 12 Feb 2013 19:27:54 +0000 (UTC) Received: (qmail 39041 invoked by uid 500); 12 Feb 2013 19:27:54 -0000 Delivered-To: apmail-incubator-crunch-user-archive@incubator.apache.org Received: (qmail 38999 invoked by uid 500); 12 Feb 2013 19:27:54 -0000 Mailing-List: contact crunch-user-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: crunch-user@incubator.apache.org Delivered-To: mailing list crunch-user@incubator.apache.org Received: (qmail 38990 invoked by uid 99); 12 Feb 2013 19:27:54 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Feb 2013 19:27:54 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of mike.barretta@gmail.com designates 74.125.82.176 as permitted sender) Received: from [74.125.82.176] (HELO mail-we0-f176.google.com) (74.125.82.176) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Feb 2013 19:27:47 +0000 Received: by mail-we0-f176.google.com with SMTP id s43so360023wey.7 for ; Tue, 12 Feb 2013 11:27:27 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:x-received:in-reply-to:references:date:message-id :subject:from:to:content-type; bh=ADVKke8UXHEpQJ7S8WHFGmp5JZ+MknEFm1ABZEzEXKs=; b=A8RavbieW0WpUsGSnoZckG5WxaxaX8OwXv/ZpFWAEw4ELzTaKxjF2nCxSWg3ljqkal 5QRIp7ChB+858Bb4PmYJOJFJDwGKce15Nh2/u9BmP0PltfcGC9ATsV8+bV5F6SdXn8fN wm1FiXE8oMALnUUGhhFvdkkheB10U6kMxEef/mrimGzSlqtjK/1fdW8i51gOxaamEjIA sZEJ3QloxYz6chsUe+FrClp0S2xiAaJ8gKqwTbdApNIIaDyi+KzZTtgIKCeJWxcl3SRe 3/qfcotmwfnkMbTl1KCfvtg1Sdles18s1Gn5d0gb6HTgPNU2cPAKAr+HCeSBN8Y21Vlp Vs7A== MIME-Version: 1.0 X-Received: by 10.194.76.7 with SMTP id g7mr33503599wjw.50.1360697247576; Tue, 12 Feb 2013 11:27:27 -0800 (PST) Received: by 10.180.96.66 with HTTP; Tue, 12 Feb 2013 11:27:27 -0800 (PST) In-Reply-To: References: Date: Tue, 12 Feb 2013 14:27:27 -0500 Message-ID: Subject: Re: pipeline writeTextFile spitting out encoded files? From: Mike Barretta To: crunch-user@incubator.apache.org Content-Type: multipart/alternative; boundary=047d7beba20216b45c04d58c0499 X-Virus-Checked: Checked by ClamAV on apache.org --047d7beba20216b45c04d58c0499 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Still debugging this on my end, but to answer your questions: Compression: mapred.map.output.compression.codec =3D org.apache.hadoop.io.compress.DefaultCodec mapred.compress.map.output =3D true mapred.output.compress =3D true mapred.output.compression.type =3D BLOCK Groovy: Rocks! One thing I'd actually like to see/do would be to translate GPars parallelization functions (has functions like map, reduce, filter, size, sum, min/max, sort, groupBy, combine: http://gpars.org/guide/guide/3.%20Data%20Parallelism.html) into Crunch. On Tue, Feb 12, 2013 at 12:30 PM, Josh Wills wrote: > Okay-- how about the cluster compression settings? > > > On Tue, Feb 12, 2013 at 9:17 AM, Mike Barretta w= rote: > >> Josh, >> >> Did the swap, but got the same result. >> >> Inside that function() is something like: >> >> emitter.emit([ >> it.id >> it.name, >> it.value >> ].join("\t")) >> >> If it isn't obvious, I'm trying to output some HDFS tables containing >> serialized objects to TSV files. Log statements at that emit line show >> that list.join spitting out a clear-text string. >> >> Thanks, >> Mike >> >> >> >> On Tue, Feb 12, 2013 at 11:46 AM, Josh Wills wrote= : >> >>> I haven't seen that one before-- I'm assuming there's some code in the >>> function() (or in the assembler?) to force the obj (which is a Thrift >>> record at some point?) to be a string before it gets emitted. >>> >>> To make sure it's not a Crunch bug, would you mind writing: >>> >>> crunchPipeline.write(collection, To.textFile("$outputPath/$outputDir")) >>> >>> in place of writeTextFile? We do some additional checks in writeTextFil= e >>> and I want to be sure we didn't screw something up. >>> >>> I'm curious about Hadoop cluster version, and the settings for >>> compression on your job/cluster ( >>> https://ccp.cloudera.com/display/CDHDOC/Snappy+Installation for >>> references to the parameters) as well. Also, how do you like Groovy as = a >>> language for writing Crunch pipelines? I haven't used it since the Grai= ls >>> days, but I have some friends at SAS who love it. >>> >>> Josh >>> >>> >>> On Tue, Feb 12, 2013 at 8:33 AM, Mike Barretta wrote: >>> >>>> I'm running some simple parallelDos which emit Strings. When I write >>>> the resulting PCollection out using pipeline.writeTextFile(), I see ga= rbled >>>> garbage like: >>>> >>>> >>>> 7?%?=C8=B2x?B?_L?v(=D4=AD?,??%?o;;??b-s?aaPXI???O??E;u?%k?????Z7??oD?r= ???e=CC=BCrX??/????)??=C6=A4?r3l?R-}?+?!*??@!??Q?6?N??=3D????????v*B?=3DH??= !0?ve??b?d?uZ7??4?H?i??uw??=E2=80=B9)Pxy >>>> ?n%?k=DB=A3???v??xaI?w=C3=A6??v^?2i?<93\?G?=E2=80=9C???N? >>>> ??}?/?EG??mK??*?9;vG??Sb?_L??XD?U?M?=DD=A4o?U??c???qwa?q?=D4=AB.?9??(?= ???H?o?3i|?7=C4=AE????B??n?%?\?uxw??=CE=9C???=CC=A2??)-?S?su??=D2=80:?????= =DD=B9??#)??V?7?!???????R?>???EZ}v??8=C9=BF?????=DE=A6%?~?W?pi?|}?/d#??nr?\= a?FUh?Y=DF=A0?|sf%M >>>> v)S??. 4$??? >>>> >>>> 3T???^?*?#I????b=D2=80=E0=A1=91???x??%?f?=D0=8E??U???h??,~H?T=3DO >>>> >>>> ??z]JWt?q?B?e2? >>>> >>>> The code (Groovy - function() is a passed in closure that does the >>>> emitter.emit()) looks like: >>>> >>>> collection.parallelDo(this.class.name + ":" + table, new >>>> DoFn, String>() { >>>> @Override >>>> void process(Pair input, >>>> Emitter emitter) { >>>> input.second().toArray().each { >>>> def obj =3D assembler.assemble([PetalUtils.toThrift(input.first(= ), >>>> it)]) >>>> function(obj, emitter) >>>> } >>>> } >>>> }, Writables.strings()) >>>> crunchPipeline.writeTextFile(collection, "$outputPath/$outputDir") >>>> >>>> It's worth noting I saw the same output when running plain word count. >>>> >>>> Is this something that's my fault? Or the cluster, cluster compression= , >>>> etc? >>>> >>> >>> >>> >>> -- >>> Director of Data Science >>> Cloudera >>> Twitter: @josh_wills >>> >> >> > > > -- > Director of Data Science > Cloudera > Twitter: @josh_wills > --047d7beba20216b45c04d58c0499 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Still debugging this on my end, but to answer your questio= ns:

Compression:
mapred.map.output.compression= .codec =3D=C2=A0org.apache.had= oop.io.compress.DefaultCodec
mapred.compress.map.output =3D true
mapred.output.compress = =3D true
mapred.output.compression.type =3D BLOCK
<= br>
Groovy:=C2=A0
Rocks! One thing I'd = actually like to see/do would be to translate GPars parallelization functio= ns (has functions like map, reduce, filter, size, sum, min/max, sort, group= By, combine: http://gpars.org/guide/guide/3.%20Data%20Parallelism.html) into= Crunch.


On Tue,= Feb 12, 2013 at 12:30 PM, Josh Wills <jwills@cloudera.com> wrote:
Okay-- how about the cluste= r compression settings?


On Tue, Feb 12, 2013 at 9:17 AM, Mike Ba= rretta <mike.barretta@gmail.com> wrote:
Josh,

Di= d the swap, but got the same result.

Inside that f= unction() is something like:

emitter.emit([
=C2=A0 =C2=A0 it.id
=C2=A0 =C2=A0 it.name,
=C2=A0 =C2=A0 it.value
].join("\t"))

If it isn't obvious, I'm trying to output some HDFS= tables containing serialized objects to TSV files. =C2=A0Log statements at= that emit line show that list.join spitting out a clear-text string.

Thanks,
Mike

=


On = Tue, Feb 12, 2013 at 11:46 AM, Josh Wills <jwills@cloudera.com> wrote:
I haven't seen that one= before-- I'm assuming there's some code in the function() (or in t= he assembler?) to force the obj (which is a Thrift record at some point?) t= o be a string before it gets emitted.

To make sure it's not a Crunch bug, would you mind writi= ng:

crunchPipeline.write(collection, To.textFile(&= quot;$outputPath/$outputDir"))

in place of writeTextFile? We do some additional checks in write= TextFile and I want to be sure we didn't screw something up.
=
I'm curious about Hadoop cluster version, and the settin= gs for compression on your job/cluster (https://ccp.cloudera= .com/display/CDHDOC/Snappy+Installation for references to the parameter= s) as well. Also, how do you like Groovy as a language for writing Crunch p= ipelines? I haven't used it since the Grails days, but I have some frie= nds at SAS who love it.

Josh

On Tue, Feb 12, 2013 at 8:33 AM, Mike Barr= etta <mike.barretta@gmail.com> wrote:
I'm running some simple= parallelDos which emit Strings. =C2=A0When I write the resulting PCollecti= on out using pipeline.writeTextFile(), I see garbled garbage like:=C2=A0
7?%?=C8=B2x?B?_L?v(=D4=AD?,??%?o;;??b-s?aaPXI???O??E;u?%k????= ?Z7??oD?r???e=CC=BCrX??/????)??=C6=A4?r3l?R-}?+?!*??@!??Q?6?N??=3D????????v= *B?=3DH??!0?ve??b?d?uZ7??4?H?i??uw??=E2=80=B9)Pxy
?n%?k=DB=A3???v??xaI?w=C3=A6??v^?2i?<93\?G?=E2=80=9C???N? ??}?/?EG?= ?mK??*?9;vG??Sb?_L??XD?U?M?=DD=A4o?U??c???qwa?q?=D4=AB.?9??(????H?o?3i|?7= =C4=AE????B??n?%?\?uxw??=CE=9C???=CC=A2??)-?S?su??=D2=80:?????=DD=B9??#)??V= ?7?!???????R?>???EZ}v??8=C9=BF?????=DE=A6%?~?W?pi?|}?/d#??nr?\a?FUh?Y=DF= =A0?|sf%M
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0v)S??. 4$???
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A03T= ???^?*?#I????b=D2=80=E0=A1=91???x??%?f?=D0=8E??U???h??,~H?T=3DO
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 ??z]JWt?q?B?e2?

The code (Groovy - function() is a passed in clos= ure that does the emitter.emit()) looks like:

collection.parallelDo(this.class.name + ":" + table, new DoFn<Pair<ColumnKey= , ColumnDataArrayWritable>, String>() {
=C2=A0 @Override
=C2=A0 void process(Pair<ColumnKey, Colu= mnDataArrayWritable> input, Emitter<String> emitter) {
= =C2=A0 =C2=A0 input.second().toArray().each {
=C2=A0 =C2=A0 =C2= =A0 def obj =3D assembler.assemble([PetalUtils.toThrift(input.first(), it)]= )
=C2=A0 =C2=A0 =C2=A0 function(obj, emitter)
=C2=A0 =C2=A0 }<= /div>
=C2=A0 }
}, Writables.strings())
crunchPipeli= ne.writeTextFile(collection, "$outputPath/$outputDir")

It's worth noting I saw the same output when running plain word count.<= /div>

Is this something that's my fault? Or the clus= ter, cluster compression, etc?



<= font color=3D"#888888">--
Director of Data Science
Twitter: @= josh_wills




--
=
Director of Data Science
Twitter: @josh_wills

--047d7beba20216b45c04d58c0499--