spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Li Li <fancye...@gmail.com>
Subject Re: running lda in spark throws exception
Date Wed, 13 Jan 2016 09:17:36 GMT
I will try spark 1.6.0 to see it is the bug of 1.5.2.

On Wed, Jan 13, 2016 at 3:58 PM, Li Li <fancyerii@gmail.com> wrote:
> I have set up a stand alone spark cluster and use the same codes. it
> still failed with the same exception
> I also preprocessed the data to lines of integers and use the scala
> codes of lda example. it still failed.
> the codes:
>
> import org.apache.spark.mllib.clustering.{ LDA, DistributedLDAModel }
>
> import org.apache.spark.mllib.linalg.Vectors
>
> import org.apache.spark.SparkContext
>
> import org.apache.spark.SparkContext._
>
> import org.apache.spark.SparkConf
>
>
> object TestLDA {
>
>   def main(args: Array[String]) {
>
>     if(args.length!=4){
>
>       println("need 4 args inDir outDir topic iternum")
>
>       System.exit(-1)
>
>     }
>
>     val conf = new SparkConf().setAppName("TestLDA")
>
>     val sc = new SparkContext(conf)
>
>     // Load and parse the data
>
>     val data = sc.textFile(args(0))
>
>     val parsedData = data.map(s => Vectors.dense(s.trim.split('
> ').map(_.toDouble)))
>
>     // Index documents with unique IDs
>
>     val corpus = parsedData.zipWithIndex.map(_.swap).cache()
>
>     val topicNum=Integer.valueOf(args(2))
>
>     val iterNum=Integer.valueOf(args(1))
>
>     // Cluster the documents into three topics using LDA
>
>     val ldaModel = new
> LDA().setK(topicNum).setMaxIterations(iterNum).run(corpus)
>
>
>     // Output topics. Each is a distribution over words (matching word
> count vectors)
>
>     println("Learned topics (as distributions over vocab of " +
> ldaModel.vocabSize + " words):")
>
>     val topics = ldaModel.topicsMatrix
>
>     for (topic <- Range(0, topicNum)) {
>
>       print("Topic " + topic + ":")
>
>       for (word <- Range(0, ldaModel.vocabSize)) { print(" " +
> topics(word, topic)); }
>
>       println()
>
>     }
>
>
>     // Save and load model.
>
>     ldaModel.save(sc, args(1))
>
>   }
>
>
> }
>
> scripts to submit:
>
> ~/spark-1.5.2-bin-hadoop2.6/bin/spark-submit --class
> com.mobvoi.knowledgegraph.scala_test.TestLDA \
>
>     --master spark://master:7077 \
>
>     --num-executors 10 \
>
>     --executor-memory 4g \
>
>     --executor-cores 3 \
>
>     scala_test-1.0-jar-with-dependencies.jar \
>
>     /test.txt \
>
>     100 \
>
>     5  \
>
>     /lda_model
>
> test.txt is in attachment
>
>
> On Sat, Jan 9, 2016 at 6:21 AM, Bryan Cutler <cutlerb@gmail.com> wrote:
>> Hi Li,
>>
>> I tried out your code and sample data in both local mode and Spark
>> Standalone and it ran correctly with output that looks good.  Sorry, I don't
>> have a YARN cluster setup right now, so maybe the error you are seeing is
>> specific to that.  Btw, I am running the latest Spark code from the master
>> branch.  Hope that helps some!
>>
>> Bryan
>>
>> On Mon, Jan 4, 2016 at 8:42 PM, Li Li <fancyerii@gmail.com> wrote:
>>>
>>> anyone could help? the problem is very easy to reproduce. What's wrong?
>>>
>>> On Wed, Dec 30, 2015 at 8:59 PM, Li Li <fancyerii@gmail.com> wrote:
>>> > I use a small data and reproduce the problem.
>>> > But I don't know my codes are correct or not because I am not familiar
>>> > with spark.
>>> > So I first post my codes here. If it's correct, then I will post the
>>> > data.
>>> > one line of my data like:
>>> >
>>> > { "time":"08-09-17","cmtUrl":"2094361"
>>> >
>>> > ,"rvId":"rev_10000020","webpageUrl":"http://www.dianping.com/shop/2094361","word_vec":[0,1,2,3,4,5,6,2,7,8,9
>>> >
>>> > ,10,11,12,13,14,15,16,8,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,32,35,36,37,38,15,39,40,41,42,5,43,44,17,45,46,42,47,26,48,49]}
>>> >
>>> > it's a json file which contains webpageUrl and word_vec which is the
>>> > encoded words.
>>> > The first step is to prase the input rdd to a rdd of VectorUrl.
>>> > BTW, if public VectorUrl call(String s) return null, is it ok?
>>> > Then follow the example Index documents with unique IDs
>>> > Then I create a rdd to map id to url so after lda training, I can find
>>> > the url of the document. Then save this rdd to hdfs.
>>> > Then create corpus rdd and train
>>> >
>>> > The exception stack is
>>> >
>>> > 15/12/30 20:45:42 ERROR yarn.ApplicationMaster: User class threw
>>> > exception: java.lang.IndexOutOfBoundsException: (454,0) not in
>>> > [-58,58) x [-100,100)
>>> > java.lang.IndexOutOfBoundsException: (454,0) not in [-58,58) x
>>> > [-100,100)
>>> > at breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>>> > at
>>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>>> > at
>>> > org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>>> > at
>>> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>> > at
>>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>>> > at
>>> > org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>>> > at
>>> > com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:89)
>>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> > at
>>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> > at
>>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> > at java.lang.reflect.Method.invoke(Method.java:606)
>>> > at
>>> > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
>>> >
>>> >
>>> > ==========here is my codes==============
>>> >
>>> > SparkConf conf = new SparkConf().setAppName(ReviewLDA.class.getName());
>>> >
>>> >     JavaSparkContext sc = new JavaSparkContext(conf);
>>> >
>>> >
>>> >     // Load and parse the data
>>> >
>>> >     JavaRDD<String> data = sc.textFile(inputDir + "/*");
>>> >
>>> >     JavaRDD<VectorUrl> parsedData = data.map(new Function<String,
>>> > VectorUrl>() {
>>> >
>>> >       public VectorUrl call(String s) {
>>> >
>>> >         JsonParser parser = new JsonParser();
>>> >
>>> >         JsonObject jo = parser.parse(s).getAsJsonObject();
>>> >
>>> >         if (!jo.has("word_vec") || !jo.has("webpageUrl")) {
>>> >
>>> >           return null;
>>> >
>>> >         }
>>> >
>>> >         JsonArray word_vec = jo.get("word_vec").getAsJsonArray();
>>> >
>>> >         String url = jo.get("webpageUrl").getAsString();
>>> >
>>> >         double[] values = new double[word_vec.size()];
>>> >
>>> >         for (int i = 0; i < values.length; i++)
>>> >
>>> >           values[i] = word_vec.get(i).getAsInt();
>>> >
>>> >         return new VectorUrl(Vectors.dense(values), url);
>>> >
>>> >       }
>>> >
>>> >     });
>>> >
>>> >
>>> >
>>> >     // Index documents with unique IDs
>>> >
>>> >     JavaPairRDD<Long, VectorUrl> id2doc =
>>> > JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
>>> >
>>> >         new Function<Tuple2<VectorUrl, Long>, Tuple2<Long, VectorUrl>>()
>>> > {
>>> >
>>> >           public Tuple2<Long, VectorUrl> call(Tuple2<VectorUrl,
Long>
>>> > doc_id) {
>>> >
>>> >             return doc_id.swap();
>>> >
>>> >           }
>>> >
>>> >         }));
>>> >
>>> >     JavaPairRDD<Long, String> id2Url = JavaPairRDD.fromJavaRDD(id2doc
>>> >
>>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
>>> > String>>() {
>>> >
>>> >           @Override
>>> >
>>> >           public Tuple2<Long, String> call(Tuple2<Long, VectorUrl>
>>> > id2doc) throws Exception {
>>> >
>>> >             return new Tuple2(id2doc._1, id2doc._2.url);
>>> >
>>> >           }
>>> >
>>> >         }));
>>> >
>>> >     id2Url.saveAsTextFile(id2UrlPath);
>>> >
>>> >     JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(id2doc
>>> >
>>> >         .map(new Function<Tuple2<Long, VectorUrl>, Tuple2<Long,
>>> > Vector>>() {
>>> >
>>> >           @Override
>>> >
>>> >           public Tuple2<Long, Vector> call(Tuple2<Long, VectorUrl>
>>> > id2doc) throws Exception {
>>> >
>>> >             return new Tuple2(id2doc._1, id2doc._2.vec);
>>> >
>>> >           }
>>> >
>>> >         }));
>>> >
>>> >     corpus.cache();
>>> >
>>> >
>>> >     // Cluster the documents into three topics using LDA
>>> >
>>> >     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>>> > LDA().setMaxIterations(iterNumber)
>>> >
>>> >         .setK(topicNumber).run(corpus);
>>> >
>>> > On Wed, Dec 30, 2015 at 3:34 PM, Li Li <fancyerii@gmail.com> wrote:
>>> >> I will use a portion of data and try. will the hdfs block affect
>>> >> spark?(if so, it's hard to reproduce)
>>> >>
>>> >> On Wed, Dec 30, 2015 at 3:22 AM, Joseph Bradley <joseph@databricks.com>
>>> >> wrote:
>>> >>> Hi Li,
>>> >>>
>>> >>> I'm wondering if you're running into the same bug reported here:
>>> >>> https://issues.apache.org/jira/browse/SPARK-12488
>>> >>>
>>> >>> I haven't figured out yet what is causing it.  Do you have a small
>>> >>> corpus
>>> >>> which reproduces this error, and which you can share on the JIRA?
 If
>>> >>> so,
>>> >>> that would help a lot in debugging this failure.
>>> >>>
>>> >>> Thanks!
>>> >>> Joseph
>>> >>>
>>> >>> On Sun, Dec 27, 2015 at 7:26 PM, Li Li <fancyerii@gmail.com>
wrote:
>>> >>>>
>>> >>>> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
>>> >>>> it throws exception in line:   Matrix topics =
>>> >>>> ldaModel.topicsMatrix();
>>> >>>> But in yarn job history ui, it's successful. What's wrong with
it?
>>> >>>> I submit job with
>>> >>>> .bin/spark-submit --class Myclass \
>>> >>>>     --master yarn-client \
>>> >>>>     --num-executors 2 \
>>> >>>>     --driver-memory 4g \
>>> >>>>     --executor-memory 4g \
>>> >>>>     --executor-cores 1 \
>>> >>>>
>>> >>>>
>>> >>>> My codes:
>>> >>>>
>>> >>>>    corpus.cache();
>>> >>>>
>>> >>>>
>>> >>>>     // Cluster the documents into three topics using LDA
>>> >>>>
>>> >>>>     DistributedLDAModel ldaModel = (DistributedLDAModel) new
>>> >>>>
>>> >>>>
>>> >>>> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
>>> >>>>
>>> >>>>
>>> >>>>     // Output topics. Each is a distribution over words (matching
>>> >>>> word
>>> >>>> count vectors)
>>> >>>>
>>> >>>>     System.out.println("Learned topics (as distributions over
vocab
>>> >>>> of
>>> >>>> " + ldaModel.vocabSize()
>>> >>>>
>>> >>>>         + " words):");
>>> >>>>
>>> >>>>    //Line81, exception here:    Matrix topics =
>>> >>>> ldaModel.topicsMatrix();
>>> >>>>
>>> >>>>     for (int topic = 0; topic < topicNumber; topic++) {
>>> >>>>
>>> >>>>       System.out.print("Topic " + topic + ":");
>>> >>>>
>>> >>>>       for (int word = 0; word < ldaModel.vocabSize(); word++)
{
>>> >>>>
>>> >>>>         System.out.print(" " + topics.apply(word, topic));
>>> >>>>
>>> >>>>       }
>>> >>>>
>>> >>>>       System.out.println();
>>> >>>>
>>> >>>>     }
>>> >>>>
>>> >>>>
>>> >>>>     ldaModel.save(sc.sc(), modelPath);
>>> >>>>
>>> >>>>
>>> >>>> Exception in thread "main" java.lang.IndexOutOfBoundsException:
>>> >>>> (1025,0) not in [-58,58) x [-100,100)
>>> >>>>
>>> >>>>         at
>>> >>>> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> >>>>
>>> >>>>         at
>>> >>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
>>> >>>>
>>> >>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> >>>> Method)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> >>>>
>>> >>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>>> >>>>
>>> >>>>         at
>>> >>>>
>>> >>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>>> >>>>
>>> >>>>         at
>>> >>>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>> >>>>
>>> >>>>         at
>>> >>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>>> >>>>
>>> >>>>         at
>>> >>>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> >>>>
>>> >>>> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from
>>> >>>> shutdown
>>> >>>> hook
>>> >>>>
>>> >>>> ---------------------------------------------------------------------
>>> >>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>>> >>>> For additional commands, e-mail: dev-help@spark.apache.org
>>> >>>>
>>> >>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: dev-help@spark.apache.org
>>>
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Mime
View raw message