beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mahender Devaruppala <mahend...@apporchid.com>
Subject RE: Apache Beam v2.1.0 - Spark Runner Issue
Date Mon, 04 Sep 2017 14:00:43 GMT
Sure, thanks very much JB, will look forward to your link. 

-----Original Message-----
From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net] 
Sent: Monday, September 4, 2017 8:59 AM
To: user@beam.apache.org
Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue

Hi,

Actually, I'm preparing the PR. I will send the PR link to you.

Give me just some time to rebase my branch and push.

Thanks,
Regards
JB

On 09/04/2017 03:15 PM, Mahender Devaruppala wrote:
> Hi JB,
> 
> If possible, could you please send me the code/location to download Spark Runner for
Spark 2.x?
> 
> Thanks,
> Mahender
> 
> -----Original Message-----
> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> Sent: Friday, September 1, 2017 1:54 AM
> To: user@beam.apache.org
> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> 
> Sure, I will send the PR during the weekend. I will let you know.
> 
> Regards
> JB
> 
> On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
>> Thanks JB.  Could you please point me to the location of Spark Runner specific to
Spark 2.x or is this something part any configurations?
>>
>> -----Original Message-----
>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>> Sent: Thursday, August 31, 2017 12:11 AM
>> To: user@beam.apache.org
>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>
>> Hi,
>>
>> I'm working on a Spark runner specific to Spark 2.x as the API changed.
>>
>> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
>>
>> Regards
>> JB
>>
>> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
>>> Hello,
>>>
>>> I am running into spark assertion error when running a apache 
>>> pipeline and below are the details:
>>>
>>> Apache Beam version: 2.1.0
>>>
>>> Spark version: 2.1.0
>>>
>>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset 
>>> must return a zero value copy
>>>
>>>                   at scala.Predef$.assert(Predef.scala:179)
>>>
>>>                   at
>>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
>>> 1
>>> 62)
>>>
>>>                   at
>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>>
>>>                   at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
>>> Source)
>>>
>>>                   at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>>
>>> Can you please let me know if Apache beam v2.1.0 Spark runner is 
>>> compatible to work with Spark v2.1.0?
>>>
>>> Below is the code snippet for the pipeline:
>>>
>>>           PipelineOptionsFactory./register/(CSVOptions.*class*);
>>>
>>>                 CSVOptions options=
>>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptio
>>> n
>>> s
>>> .*class*);
>>>
>>> options.setRunner(SparkRunner.*class*);
>>>
>>> options.setSparkMaster("local[4]");
>>>
>>> options.setEnableSparkMetricSinks(*false*);
>>>
>>>                 Pipeline p= Pipeline./create/(options);
>>>
>>> p.apply("ReadMyCSVFile",
>>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
>>>
>>>                 .apply(*new*DataLoader())
>>>
>>>                 
>>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
>>>
>>>           
>>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jd
>>> b
>>> c
>>> :postgresql://localhost:5432/beam")
>>>
>>>                            
>>> .withUsername("postgres").withPassword("postgres")).withStatement("i
>>> n s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
>>>
>>>                               
>>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<St
>>> r
>>> i
>>> ng>()_ {
>>>
>>> *public**void*setParameters(String element, PreparedStatement query) 
>>> *throws*SQLException {
>>>
>>>                                            String[] datas= 
>>> element.split("\t");
>>>
>>> *if*(datas.length>0) {
>>>
>>> *for*(*int*j=0 ; j<datas.length;j++){
>>>
>>> query.setString(j+1, datas[j]);
>>>
>>>                                                   }
>>>
>>>                                            }
>>>
>>> }
>>>
>>>                   }));
>>>
>>>                 SparkRunner runner= SparkRunner./create/(options);
>>>
>>> runner.run(p).waitUntilFinish();
>>>
>>> Any help would be greatly appreciated.
>>>
>>> Thanks,
>>>
>>> Mahender
>>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
> 
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 

--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
Mime
View raw message