beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jean-Baptiste Onofré ...@nanthrax.net>
Subject Re: Apache Beam v2.1.0 - Spark Runner Issue
Date Fri, 08 Sep 2017 06:09:41 GMT
Hmmm weird, let me check.

Regards
JB

On 09/07/2017 09:03 PM, sureshkumarvepari@gmail.com wrote:
> 
> 
> On 2017-09-05 19:01, Jean-Baptiste Onofré <jb@nanthrax.net> wrote:
>> Hi guys
>>
>> I created PR #3808:
>>
>> https://github.com/apache/beam/pull/3808
>>
>> It introduces a new Spark 2.x specific runner (in addition of the Spark 1.x one).
>>
>> I have some questions (more for dev) that I let in a comment.
>>
>> It's still a work in progress as I have to fix unit tests and discuss about the
>> validate runner test. However you can already take a look.
>>
>> Regards
>> JB
>>
>> On 09/04/2017 04:00 PM, Mahender Devaruppala wrote:
>>> Sure, thanks very much JB, will look forward to your link.
>>>
>>> -----Original Message-----
>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>>> Sent: Monday, September 4, 2017 8:59 AM
>>> To: user@beam.apache.org
>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>>
>>> Hi,
>>>
>>> Actually, I'm preparing the PR. I will send the PR link to you.
>>>
>>> Give me just some time to rebase my branch and push.
>>>
>>> Thanks,
>>> Regards
>>> JB
>>>
>>> On 09/04/2017 03:15 PM, Mahender Devaruppala wrote:
>>>> Hi JB,
>>>>
>>>> If possible, could you please send me the code/location to download Spark
Runner for Spark 2.x?
>>>>
>>>> Thanks,
>>>> Mahender
>>>>
>>>> -----Original Message-----
>>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>>>> Sent: Friday, September 1, 2017 1:54 AM
>>>> To: user@beam.apache.org
>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>>>
>>>> Sure, I will send the PR during the weekend. I will let you know.
>>>>
>>>> Regards
>>>> JB
>>>>
>>>> On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
>>>>> Thanks JB.  Could you please point me to the location of Spark Runner
specific to Spark 2.x or is this something part any configurations?
>>>>>
>>>>> -----Original Message-----
>>>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>>>>> Sent: Thursday, August 31, 2017 12:11 AM
>>>>> To: user@beam.apache.org
>>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>>>>
>>>>> Hi,
>>>>>
>>>>> I'm working on a Spark runner specific to Spark 2.x as the API changed.
>>>>>
>>>>> So, for now, I have two runners: one for Spark 1.x and one for Spark
2.x.
>>>>>
>>>>> Regards
>>>>> JB
>>>>>
>>>>> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
>>>>>> Hello,
>>>>>>
>>>>>> I am running into spark assertion error when running a apache
>>>>>> pipeline and below are the details:
>>>>>>
>>>>>> Apache Beam version: 2.1.0
>>>>>>
>>>>>> Spark version: 2.1.0
>>>>>>
>>>>>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset
>>>>>> must return a zero value copy
>>>>>>
>>>>>>                     at scala.Predef$.assert(Predef.scala:179)
>>>>>>
>>>>>>                     at
>>>>>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
>>>>>> 1
>>>>>> 62)
>>>>>>
>>>>>>                     at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>> Method)
>>>>>>
>>>>>>                     at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
>>>>>> Source)
>>>>>>
>>>>>>                     at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>>>>>
>>>>>> Can you please let me know if Apache beam v2.1.0 Spark runner is
>>>>>> compatible to work with Spark v2.1.0?
>>>>>>
>>>>>> Below is the code snippet for the pipeline:
>>>>>>
>>>>>>             PipelineOptionsFactory./register/(CSVOptions.*class*);
>>>>>>
>>>>>>                   CSVOptions options=
>>>>>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptio
>>>>>> n
>>>>>> s
>>>>>> .*class*);
>>>>>>
>>>>>> options.setRunner(SparkRunner.*class*);
>>>>>>
>>>>>> options.setSparkMaster("local[4]");
>>>>>>
>>>>>> options.setEnableSparkMetricSinks(*false*);
>>>>>>
>>>>>>                   Pipeline p= Pipeline./create/(options);
>>>>>>
>>>>>> p.apply("ReadMyCSVFile",
>>>>>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
>>>>>>
>>>>>>                   .apply(*new*DataLoader())
>>>>>>
>>>>>>                   
>>>>>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
>>>>>>
>>>>>>             
>>>>>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jd
>>>>>> b
>>>>>> c
>>>>>> :postgresql://localhost:5432/beam")
>>>>>>
>>>>>>                              
>>>>>> .withUsername("postgres").withPassword("postgres")).withStatement("i
>>>>>> n s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
>>>>>>
>>>>>>                                 
>>>>>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<St
>>>>>> r
>>>>>> i
>>>>>> ng>()_ {
>>>>>>
>>>>>> *public**void*setParameters(String element, PreparedStatement query)
>>>>>> *throws*SQLException {
>>>>>>
>>>>>>                                              String[] datas=
>>>>>> element.split("\t");
>>>>>>
>>>>>> *if*(datas.length>0) {
>>>>>>
>>>>>> *for*(*int*j=0 ; j<datas.length;j++){
>>>>>>
>>>>>> query.setString(j+1, datas[j]);
>>>>>>
>>>>>>                                                     }
>>>>>>
>>>>>>                                              }
>>>>>>
>>>>>> }
>>>>>>
>>>>>>                     }));
>>>>>>
>>>>>>                   SparkRunner runner= SparkRunner./create/(options);
>>>>>>
>>>>>> runner.run(p).waitUntilFinish();
>>>>>>
>>>>>> Any help would be greatly appreciated.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Mahender
>>>>>>
>>>>>
>>>>> --
>>>>> Jean-Baptiste Onofré
>>>>> jbonofre@apache.org
>>>>> http://blog.nanthrax.net
>>>>> Talend - http://www.talend.com
>>>>>
>>>>
>>>> --
>>>> Jean-Baptiste Onofré
>>>> jbonofre@apache.org
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
>>>>
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbonofre@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>
>> -- 
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>> Hi JB,
> I have tried this PR and still I see the same error i.e Caused by: java.lang.AssertionError:
assertion failed: copyAndReset must return a zero value copy
> Does the change from Iterables to iterators is supposed to fix this issue or missing
something else . ?
> please confirm.
> 
> Thanks in Advance.
> Suri
> 

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Mime
View raw message