beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "liyuntian (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-2274) beam on spark runner run much slower than using spark
Date Mon, 15 May 2017 08:44:04 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010175#comment-16010175
] 

liyuntian commented on BEAM-2274:
---------------------------------

this is my spark code:
JavaRDD<String> input = sc.textFile(hdfsIn);
        JavaRDD<List<String>> input1 = input.map(new Function<String, List<String>>()
{
            @Override
            public List<String> call(String s) throws Exception {
                return Arrays.asList(s.split(","));
            }
        });
        JavaRDD<List<String>> input2 = input1.map(new Function<List<String>,
List<String>>() {
            @Override
            public List<String> call(List<String> list) throws Exception {
                ArrayList<String> lists = new ArrayList<>(list);
                String a = lists.get(0);
                String b = lists.get(1);
                lists.add(a + b);
                lists.add(String.valueOf(Integer.parseInt(a) + Integer.parseInt(b)));
                lists.add(a.split(",")[0]);
                lists.add(b + a);
                return lists;
            }
        });
        input2.saveAsTextFile(hdfsOut);


this is my beam code:
 Read.Bounded<KV<LongWritable, Text>> source = Read.from(HDFSFileSource.from(inputPath,
TextInputFormat.class, LongWritable.class, Text.class).withConfiguration(config));
            PCollection<KV<LongWritable, Text>> recordsFromHdfs = pipeline.apply(source);
            PCollection<List<String>> recordsList = recordsFromHdfs.apply(ParDo.of(new
InputHdfsFileFn(delimit, firstTableColumnsSize)));
            //convert to flow
            String nextOutputTable;     
//my beam code is like below, and the code logic is the same as spark
           PCollection<List<String>> nextPCollection = ComponentConvert.convert(component,recordsList);
            //write result to hdfs
            PCollection<String> recordsToHdfs = nextPCollection.apply(ParDo.of(new OutputHdfsFileFn(delimit)));
            HiveTable.deleteBeamFileOnHdfs(outputPath);
            logger.info("输出文件位置:"+outputPath);
            recordsToHdfs.apply(Write.to(HDFSFileSink.<String>toText(outputPath).withConfiguration(config)));
            pipeline.run().waitUntilFinish();

my spark is 1.6.2. .I run 4320M files,Isee Elapsed on yarn.the spark use 250s ,and beam use
380s.is these possable that beam read file from hdfs costs more time or change the code logic
to spark RDD costs more time,or there is something wrong with my code?   
another question,when I run this code using spark1.6.2,I come across the error:
17/05/15 15:48:11 ERROR executor.Executor: Exception in task 1.0 in stage 1.0 (TID 3)
java.util.NoSuchElementException
	at org.apache.beam.sdk.io.hdfs.HDFSFileSource$HDFSFileReader.getCurrent(HDFSFileSource.java:510)
	at org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:142)
	at org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.next(SourceRDD.java:111)
	at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
	at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
	at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
	at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:165)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
	at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
	at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
	at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
	at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:162)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
	at org.apache.beam.spark.repackaged.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
	at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:744)
17/05/15 15:48:12 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5


> beam on spark runner run much slower than using spark
> -----------------------------------------------------
>
>                 Key: BEAM-2274
>                 URL: https://issues.apache.org/jira/browse/BEAM-2274
>             Project: Beam
>          Issue Type: Test
>          Components: runner-spark
>            Reporter: liyuntian
>            Assignee: Jean-Baptiste Onofré
>
> I run a job,read hdfs files using Read.from(HDFSFileSource.from()) and do some ParDo.of
functions.  and I also run the same job, read hdfs file using sc.textFile(file) and do some
RDDs.but I find beam job is much slower than spark job.Is there something that beam should
improve or something wrong with my system and my code?thank you.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message