beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukasz Cwik <lc...@google.com>
Subject Re: Apache Beam v2.1.0 - Spark Runner Issue
Date Wed, 30 Aug 2017 22:20:55 GMT
To my knowledge you should use Spark 1.6.3 since that is what is declared
as the spark.version in the projects root pom.xml

On Wed, Aug 30, 2017 at 2:45 PM, Mahender Devaruppala <
mahenderd@apporchid.com> wrote:

> Hello,
>
>
>
> I am running into spark assertion error when running a apache pipeline and
> below are the details:
>
>
>
> Apache Beam version: 2.1.0
>
> Spark version: 2.1.0
>
>
>
> Caused by: java.lang.AssertionError: assertion failed: copyAndReset must
> return a zero value copy
>
>                at scala.Predef$.assert(Predef.scala:179)
>
>                at org.apache.spark.util.AccumulatorV2.writeReplace(
> AccumulatorV2.scala:162)
>
>                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
>                at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
>
>                at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
>
>
>
> Can you please let me know if Apache beam v2.1.0 Spark runner is
> compatible to work with Spark v2.1.0?
>
>
>
> Below is the code snippet for the pipeline:
>
>
>
>        PipelineOptionsFactory.*register*(CSVOptions.*class*);
>
>              CSVOptions options = PipelineOptionsFactory.*fromArgs*(args
> ).withValidation().as(CSVOptions.*class*);
>
>              options.setRunner(SparkRunner.*class*);
>
>              options.setSparkMaster("local[4]");
>
>              options.setEnableSparkMetricSinks(*false*);
>
>              Pipeline p = Pipeline.*create*(options);
>
>              p.apply("ReadMyCSVFile", TextIO.*read*().from(URIUtil.
> *getFromPath*(options.getInputFile())))
>
>              .apply(*new* DataLoader())
>
>              .apply(JdbcIO.<String>*write*().withDataSourceConfiguration
>
>        (JdbcIO.DataSourceConfiguration.*create*("org.postgresql.Driver","
> jdbc:postgresql://localhost:5432/beam")
>
>                         .withUsername("postgres").withPassword("postgres"
> )).withStatement("insert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?
> ,?,?,?,?)")
>
>                            .withPreparedStatementSetter(*new*
> *JdbcIO.PreparedStatementSetter<String>()* {
>
>                                  *public* *void* setParameters(String
> element, PreparedStatement query) *throws* SQLException {
>
>                                         String[] datas = element.split(
> "\t");
>
>                                         *if*(datas.length >0) {
>
>                                                *for*(*int* j=0 ; j<datas.
> length;j++){
>
>                                                      query.setString(j+1,
> datas[j]);
>
>                                                }
>
>                                         }
>
>
>
>                                  }
>
>                }));
>
>              SparkRunner runner = SparkRunner.*create*(options);
>
>              runner.run(p).waitUntilFinish();
>
>
>
>
>
> Any help would be greatly appreciated.
>
>
>
> Thanks,
>
> Mahender
>
>
>

Mime
View raw message