spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Emre Sevinc <emre.sev...@gmail.com>
Subject Re: How to deal with code that runs before foreach block in Apache Spark?
Date Wed, 06 May 2015 14:09:21 GMT
Imran, Gerard,

Indeed your suggestions were correct and it helped me. Thank you for your
replies.

--
Emre

On Tue, May 5, 2015 at 4:24 PM, Imran Rashid <irashid@cloudera.com> wrote:

> Gerard is totally correct -- to expand a little more, I think what you
> want to do is a solrInputDocumentJavaRDD.foreachPartition, instead of
> solrInputDocumentJavaRDD.foreach:
>
>
> solrInputDocumentJavaRDD.foreachPartition(
>   new VoidFunction<Iterator<SolrInputDocument>>() {
>     @Override
>     public void call(Iterator<SolrInputDocument> docItr) {
>       List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
>       for(SolrInputDocument solrInputDocument: docItr) {
>         // Add the solrInputDocument to the list of SolrInputDocuments
>         docs.add(solrInputDocument);
>       }
>       // push things to solr **from the executor, for this partition**
>       // so for this make sense, you need to be sure solr can handle a
> bunch
>       // of executors pushing into it simultaneously.
>       addThingsToSolr(docs);
>     }
> });
>
> On Mon, May 4, 2015 at 8:44 AM, Gerard Maas <gerard.maas@gmail.com> wrote:
>
>> I'm not familiar with the Solr API but provided that ' SolrIndexerDriver'
>> is a singleton, I guess that what's going on when running on a cluster is
>> that the call to:
>>
>>  SolrIndexerDriver.solrInputDocumentList.add(elem)
>>
>> is happening on different singleton instances of the  SolrIndexerDriver
>> on different JVMs while
>>
>> SolrIndexerDriver.solrServer.commit
>>
>> is happening on the driver.
>>
>> In practical terms, the lists on the executors are being filled-in but
>> they are never committed and on the driver the opposite is happening.
>>
>> -kr, Gerard
>>
>> On Mon, May 4, 2015 at 3:34 PM, Emre Sevinc <emre.sevinc@gmail.com>
>> wrote:
>>
>>> I'm trying to deal with some code that runs differently on Spark
>>> stand-alone mode and Spark running on a cluster. Basically, for each item
>>> in an RDD, I'm trying to add it to a list, and once this is done, I want to
>>> send this list to Solr.
>>>
>>> This works perfectly fine when I run the following code in stand-alone
>>> mode of Spark, but does not work when the same code is run on a cluster.
>>> When I run the same code on a cluster, it is like "send to Solr" part of
>>> the code is executed before the list to be sent to Solr is filled with
>>> items. I try to force the execution by solrInputDocumentJavaRDD.collect();
>>> after foreach, but it seems like it does not have any effect.
>>>
>>> // For each RDD
>>> solrInputDocumentJavaDStream.foreachRDD(
>>>         new Function<JavaRDD<SolrInputDocument>, Void>() {
>>>           @Override
>>>           public Void call(JavaRDD<SolrInputDocument>
>>> solrInputDocumentJavaRDD) throws Exception {
>>>
>>>             // For each item in a single RDD
>>>             solrInputDocumentJavaRDD.foreach(
>>>                     new VoidFunction<SolrInputDocument>() {
>>>                       @Override
>>>                       public void call(SolrInputDocument
>>> solrInputDocument) {
>>>
>>>                         // Add the solrInputDocument to the list of
>>> SolrInputDocuments
>>>
>>> SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
>>>                       }
>>>                     });
>>>
>>>             // Try to force execution
>>>             solrInputDocumentJavaRDD.collect();
>>>
>>>
>>>             // After having finished adding every SolrInputDocument to
>>> the list
>>>             // add it to the solrServer, and commit, waiting for the
>>> commit to be flushed
>>>             try {
>>>
>>>               // Seems like when run in cluster mode, the list size is
>>> zero,
>>>              // therefore the following part is never executed
>>>
>>>               if (SolrIndexerDriver.solrInputDocumentList != null
>>>                       && SolrIndexerDriver.solrInputDocumentList.size()
>>> > 0) {
>>>
>>> SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
>>>                 SolrIndexerDriver.solrServer.commit(true, true);
>>>                 SolrIndexerDriver.solrInputDocumentList.clear();
>>>               }
>>>             } catch (SolrServerException | IOException e) {
>>>               e.printStackTrace();
>>>             }
>>>
>>>
>>>             return null;
>>>           }
>>>         }
>>> );
>>>
>>>
>>> What should I do, so that sending-to-Solr part executes after the list
>>> of SolrDocuments are added to solrInputDocumentList (and works also in
>>> cluster mode)?
>>>
>>>
>>> --
>>> Emre Sevinç
>>>
>>
>>
>


-- 
Emre Sevinc

Mime
View raw message