flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <rimin...@sina.cn>
Subject flink Broadcast
Date Thu, 23 Mar 2017 14:11:05 GMT
Hi ,alll,
i have a 36000 documents,and the document all transfer a vector , one doc is a vector,and
dimension is the same,so have DataSet
------------------------
val data :DataSet[(String,SparseVector)]= ....//36000 record
val toData = data.collect()
val docSims = data.map{x=>
     val fromId=x._1
     val docsims = toData.filter{y=>y._1!=fromId}.map{y=>
          val score =1- cosDisticnce(x._2,y._2)
         (y._1,score)
     }.toList.sortWith{(a,b)=>a._2>b._2}.take(20)
   (fromId,docsims)
}
docSims.writeAsText(file)
.....
when run the job on yarn,it will get error ,the message is following:
       java.lang.InterruptedException  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052)
	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
	at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)


someone can tell me ?thank you
Mime
View raw message