Return-Path: X-Original-To: apmail-crunch-user-archive@www.apache.org Delivered-To: apmail-crunch-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AEF6B17A53 for ; Wed, 13 May 2015 21:05:56 +0000 (UTC) Received: (qmail 28856 invoked by uid 500); 13 May 2015 21:05:56 -0000 Delivered-To: apmail-crunch-user-archive@crunch.apache.org Received: (qmail 28813 invoked by uid 500); 13 May 2015 21:05:56 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 28803 invoked by uid 99); 13 May 2015 21:05:56 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 May 2015 21:05:56 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 1F13D1A2B57 for ; Wed, 13 May 2015 21:05:56 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 5.298 X-Spam-Level: ***** X-Spam-Status: No, score=5.298 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, MANY_SPAN_IN_TEXT=2.399, RCVD_IN_MSPIKE_H2=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id bUIE91L_fEy8 for ; Wed, 13 May 2015 21:05:46 +0000 (UTC) Received: from mail-qg0-f48.google.com (mail-qg0-f48.google.com [209.85.192.48]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id B20AF43DD5 for ; Wed, 13 May 2015 21:05:46 +0000 (UTC) Received: by qgdy78 with SMTP id y78so28243094qgd.0 for ; Wed, 13 May 2015 14:05:40 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=cDiPmCwN/Y3w+whQOBLLTZbEgt3YdIEx8FflZOiqedA=; b=qwdIj8MfvdFAlaBnXbsI8uzr9WsJd10YPrBKfTNI2tyuSpsBbCxmm9UeyMWzK9dliT Ywjckh+cZD6NlTryuXrlqYHTxWtS8NVMpSZfH2Xwt598Gmr3zWfprYBZir952xT+fw+u Z/neXDgnaYq+cf+mu5f48YT4FzS29EcmzsqHZcqO+eDdsfHGmbHKAw7jpVjDzb3PT03x OmDegUTQl6QOQZuQ+hDnbOw946AG8fIYEPP0LskEmyOIISGq6JYMZWggIA0cTihBZbNv 1ZVx6TW5pMFnkTT5BUFsmlh1vJIV8Lmn54mXg/Iu3UbAycJhUifBs9eHG2iMAxmriqBw 7++A== MIME-Version: 1.0 X-Received: by 10.140.134.145 with SMTP id 139mr1311336qhg.6.1431551140065; Wed, 13 May 2015 14:05:40 -0700 (PDT) Received: by 10.140.172.69 with HTTP; Wed, 13 May 2015 14:05:39 -0700 (PDT) In-Reply-To: References: Date: Wed, 13 May 2015 14:05:39 -0700 Message-ID: Subject: Re: question about join From: Lucy Chen To: user@crunch.apache.org Content-Type: multipart/alternative; boundary=001a1135dae82e7a5c0515fcf9a6 --001a1135dae82e7a5c0515fcf9a6 Content-Type: text/plain; charset=UTF-8 Hi Josh, Those two functions bring the keys for each PCollections so that they can join in the next step. Here is the functions look like: public class KeyOnLabels extends DoFn>{ private String key; public KeyOnLabels(String input) { this.key = input; } @Override public void process(Labels input, Emitter> emitter) { if(key.equalsIgnoreCase("class_ID")) emitter.emit(Pair.of(input.getClassID(), input)); else if(key.equalsIgnoreCase("sample_ID")) emitter.emit(Pair.of(input.getSampleID(), input)); else emitter.emit(Pair.of(Integer.toString(input.getBinaryIndicator()), input)); } } public class KeyOnFeats extends DoFn>{ private final static Logger logger = Logger .getLogger(KeyOnFeats.class.getName()); private String key; public KeyOnFeats(String input) { this.key = input; } @Override public void process(Feats input, Emitter> emitter) { if(key.equals("sample_ID")) emitter.emit(Pair.of(input.getSampleID(), input)); else logger.error("The key should be specified as sample_ID only!"); } } Meanwhile I checked the two outputs and they looked like normal.Something like the follows: {"key": "ID1", "value": Feats@14f5e86} {"key": "ID1", "value": Labels@489983f3} That's why I feel something weird happened in the join step. Thanks! Lucy On Wed, May 13, 2015 at 12:34 PM, Josh Wills wrote: > What do the KeyOnLabels and KeyOnFeats functions return? From the error, > it looks like the labels_data PTable is actually a PCollection, > which would happen if KeyOnLabels was somehow returning a String in some > situations despite claiming to return a Pair. > > On Wed, May 13, 2015 at 12:31 PM, Lucy Chen > wrote: > >> Meanwhile, I just realized that one of my joining jobs look also OK, >> which also had the Avro type in it: >> >> JoinStrategy strategy >> >> = new DefaultJoinStrategy(50); >> >> PTable> joined = strategy.join(input_A, >> input_B, JoinType.INNER_JOIN); >> >> >> So the Avro type probably is not the issue. >> >> >> Any advice? >> >> >> Lucy >> >> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen >> wrote: >> >>> Hi all, >>> >>> I had a join step in my crunch pipeline, and it looks like the >>> following: >>> >>> //get label data >>> >>> PType LabelsType = Avros.records(Labels.class); >>> >>> PCollection training_labels = input.parallelDo(new >>> LabelDataParser(), LabelsType); >>> >>> PTable labels_data = training_labels. >>> >>> parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(), >>> LabelsType)); >>> >>> //get features >>> >>> PType FeatsType = Avros.records(Feats.class); >>> >>> PCollection training_feats = Feature.FeatLoader(pipeline, >>> sample_features_inputs); >>> >>> PTable feats_data = training_feats.parallelDo(new >>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType)); >>> >>> >>> //join labels and features >>> >>> JoinStrategy strategy = new >>> DefaultJoinStrategy(20); >>> >>> PTable> joined_training = strategy. >>> >>> join(labels_data, feats_data, JoinType.INNER_JOIN); >>> >>> >>> //class Labels >>> >>> public class Labels implements java.io.Serializable, Cloneable{ >>> >>> private String class_ID; >>> >>> private String sample_ID; >>> >>> private int binary_ind; >>> >>> public Labels() >>> >>> { >>> >>> this(null, null, 0); >>> >>> } >>> >>> public Labels(String class_ID, String sample_ID, int ind) >>> >>> { >>> >>> this.class_ID = class_ID; >>> >>> this.sample_ID = sample_ID; >>> >>> this.binary_ind = ind; >>> >>> } >>> >>> ... >>> >>> } >>> >>> >>> //class Feats >>> >>> >>> public class *Feats* implements java.io.Serializable, Cloneable{ >>> >>> private String sample_id; >>> >>> private String sample_name; >>> >>> private Map feat; >>> >>> public Feats() >>> >>> { >>> >>> this(null, null, null); >>> >>> } >>> >>> public Feats(String id, String name, Map feat) >>> >>> { >>> >>> this.sample_id = id; >>> >>> this.sample_name = name; >>> >>> this.feat = feat; >>> >>> } >>> >>> ... >>> >>> >>> } >>> >>> >>> The outputs of labels_data and feats_data are both fine; but the >>> join step throws the following exception: >>> >>> >>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to >>> org.apache.crunch.Pair at >>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87) >>> at org.apache.crunch.MapFn.process(MapFn.java:34) at >>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at >>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >>> at org.apache.crunch.MapFn.process(MapFn.java:34) at >>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at >>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at >>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at >>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at >>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at >>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at >>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at >>> java.security.AccessController.doPrivileged(Native Method) at >>> javax.security.auth.Subject.doAs(Subject.java:415) at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) >>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) >>> >>> >>> This issue already bothered me for a while; Did any one get a >>> similar issue here? Is there another option that will solve it? >>> >>> >>> Btw, I already successfully ran the following joining job: >>> >>> >>> JoinStrategy> strategy = >>> new DefaultJoinStrategy>> Float>>(100); >>> >>> PTable>> joined = >>> strategy.join(input_A, input_B, JoinType.INNER_JOIN); >>> >>> >>> So I guess the issue may be still related to the Avro types that I >>> defined. >>> >>> >>> Thanks for your advice. >>> >>> >>> Lucy >>> >>> >>> >>> >>> >> > --001a1135dae82e7a5c0515fcf9a6 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Josh,

=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0Those two functions bring the keys for each PCollections so that they= can join in the next step. Here is the functions look like:

=

public class KeyOnLabels extends DoFn<Labels, Pai= r<String, Labels>>{

<= span class=3D"" style=3D"white-space:pre">

pri= vate String key;

<= span class=3D"" style=3D"white-space:pre">

pub= lic KeyOnLabels(String input)

{

th= is.key =3D input;

}

<= span class=3D"" style=3D"white-space:pre">

@Override

pub= lic void process(Labels= input, Emitter<Pair<String, Labels>> emitter)

{

if= (key.equalsIgnoreCase("class_ID"))

=C2=A0 emitter.emit(Pair.of(input.getCl= assID(), input));

el= se if(key.equalsIgnoreCase("sample_ID"))

emitter.emit(Pair.of(input.getSampleID(= ), input));

else=C2=A0

emitter.emit(Pair.of(Integer.toString(i= nput.getBinaryIndicator()), input));

<= span class=3D"" style=3D"white-space:pre">

}

<= br>

}=C2=A0
=

public class Ke= yOnFeats=C2=A0extends D= oFn<Feats, Pair<String, Feats>>{

<= span class=3D"" style=3D"white-space:pre">

pri= vate final static Logger logger =3D Logger

=C2=A0 =C2=A0 =C2=A0 .getLogger(KeyOnFea= ts.class.getName());

pri= vate String key;

pub= lic KeyOnFeats(String input)

{

th= is.key =3D input;

}

@Override

pub= lic void process(Feats = input, Emitter<Pair<String, Feats>> emitter)

{

if= (key.equals("sample_ID"))

emitter.emit(Pair.of(input.getSampleID(= ), input));

else

logger.error("The key should be specif= ied as sample_ID only!");

}

<= br>

}


Meanwhile I checked the two outputs an= d they looked like normal.Something like the follows:


{"key": "ID1", "valu= e": Feats@14f5e86}


{"key": "= ;ID1", "value": Labels@489983f3}


=C2=A0 =C2=A0 That's why I fee= l something=C2=A0weird=C2=A0happened in the join step.


=C2=A0 =C2=A0 Thanks!<= /p>


=

Lucy


On Wed, = May 13, 2015 at 12:34 PM, Josh Wills <josh.wills@gmail.com> wrote:
What do the Ke= yOnLabels and KeyOnFeats functions return? From the error, it looks like th= e labels_data PTable is actually a PCollection<String>, which would h= appen if KeyOnLabels was somehow returning a String in some situations desp= ite claiming to return a Pair<String, Labels>.

On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <lucychen2014fa= ll@gmail.com> wrote:
Meanwhile, I just realized that one of my joini= ng jobs look also OK, which also had the Avro type in it:

JoinStra= tegy<String, Feats, Feats> strategy=C2=A0

=3D new<= /span> DefaultJoinStrategy<String, Feats, Feats>(50);

PTable<String, Pair<Feats,Feats>> = joined =3D strategy.join(input_A, input_B, JoinType.INNER_JOIN);


So the Avro type probably is not the iss= ue.


Any advice?


Lucy

=

On Wed, May 13, 2015 at 12:22 PM, Lucy Chen &= lt;lucychen= 2014fall@gmail.com> wrote:
=
Hi all,

<= /font>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0I had a= join step in my crunch pipeline, and it looks like the following:

//get label data

PType<Labels> LabelsType =3D Avros.records(Labels.class);

PCollection<Labels> training_labels =3D= input.parallelDo(new LabelDat= aParser(), LabelsType);

PTable<String, Labels> labels_data =3D training_labels.

=

parallelDo(new KeyOnLabels(&quo= t;sample_ID"), tableOf(strings(), LabelsType));

//get features

PType<Feats> FeatsType= =3D Avros.records(Feats.class= );

PCollection= <Feats> training_feats =3D Feature.FeatLoader(pipeline, sample_featur= es_inputs);

PT= able<String, Feats> feats_data =3D training_feats.parallelDo(new KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType))= ;


//join labels and fea= tures

JoinStra= tegy<String, Labels, Feats> strategy =3D new DefaultJoinStrategy<String, Labels, Feats>(20);<= /p>

PTable<String,= Pair<Labels, Feats>> joined_training =3D strategy.

join(labels_data, feats_data, JoinType.INNER_JOIN);


//class Labels

public class Labels im= plements java.io.Serializable, Cloneable{

private String c= lass_ID;

private String sample_= ID;

private int binary_ind;

public Labels()

{

this(null, null, 0);

}

= =C2=A0 =C2=A0 =C2=A0 =C2=A0 public Labels(String class_ID, String sa= mple_ID, int ind)

{

this.class_ID =3D class_ID;

this.sample_ID =3D sample_ID;

this.binary_ind =3D ind;

}

=C2=A0 =C2=A0 =C2=A0 =C2=A0 ...

}


//class Feats


public <= span style=3D"color:rgb(147,26,104)">class=C2=A0Feats=C2=A0implements java.io.Serializable,= Cloneable{

private String= sample_id;

private String samp= le_name;

= private Map<String,= Float> feat;

public=C2=A0Feats()

{

this(null, null, null);

}

public=C2=A0Feats(String id, String name, Map<String, Float> feat)

=

{

this.sample_id =3D id;

this.sample_= name =3D name;

this.feat =3D feat;

}

=C2=A0 =C2=A0 =C2=A0 =C2=A0...


}


=C2=A0 =C2=A0The outputs of labels_data and feats_data are both= fine; but the join step throws the following exception:


Error: java.lang.ClassCastException: java.lang.String cannot be cast to= org.apache.crunch.Pair at org.apache.crunch.lib.join.DefaultJoinStrategy$1= .map(DefaultJoinStrategy.java:87) at org.apache.crunch.MapFn.process(MapFn.= java:34) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at= org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitte= r.java:56) at org.apache.crunch.MapFn.process(MapFn.java:34) at org.apache.= crunch.impl.mr.run.RTNode.process(RTNode.java:98) at org.apache.crunch.impl= .mr.run.RTNode.process(RTNode.java:109) at org.apache.crunch.impl.mr.run.Cr= unchMapper.map(CrunchMapper.java:60) at org.apache.hadoop.mapreduce.Mapper.= run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTa= sk.java:763) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at o= rg.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at java.securit= y.AccessController.doPrivileged(Native Method) at javax.security.auth.Subje= ct.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformatio= n.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.mapred.YarnChil= d.main(YarnChild.java:157)


=C2=A0 =C2=A0 =C2=A0 =C2=A0This issue already bothered me for a while; Did any one get= a similar issue here? Is there another option that will solve it?

<= br>

=C2=A0 =C2=A0 =C2=A0 Btw, I already successfully ran the followi= ng=C2=A0joining job:


JoinStrategy<String, Float, Tuple3<= ;String, String, Float>> strategy =3D new DefaultJoinStrategy<String, Float, Tuple3<String, S= tring, Float>>(100);

PTable<String, Pair<Float,Tuple3&l= t;String, String, Float>>> joined =3D strategy.join(input_A, input= _B, JoinType.INNER_JOIN);


=C2=A0 =C2=A0= So I guess the issue may= be still related to the Avro types that I defined.


<= p style=3D"margin:0px">=C2=A0 =C2= =A0 =C2=A0 =C2=A0 Thanks for your advice.


Lucy


<= p style=3D"margin:0px;font-size:18px;font-family:Monaco">





--001a1135dae82e7a5c0515fcf9a6--