beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Luke Cwik (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-3210) The problem about the use of waitUntilFinish() in DirectRunner
Date Fri, 17 Nov 2017 17:05:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-3210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16257227#comment-16257227
] 

Luke Cwik commented on BEAM-3210:
---------------------------------

Your using a non-thread safe data structure and not synchronizing reads/writes.

The DirectRunner processes elements in parallel using multiple threads which is why the elements
in your list are appearing out of order.

Consider using a thread safe data structure of synchronizing your list using [Collections#synchronizedList](https://docs.oracle.com/javase/7/docs/api/java/util/Collections.html#synchronizedList(java.util.List))
, either using a thread safe data structure or synchronizing your reads and writes to you



> The problem about the use of waitUntilFinish() in DirectRunner
> --------------------------------------------------------------
>
>                 Key: BEAM-3210
>                 URL: https://issues.apache.org/jira/browse/BEAM-3210
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>    Affects Versions: 2.1.0
>         Environment: Ubuntn 14.04.3 LTS
> JDK 1.8
> Beam 2.1.0
> Maven 3.5.0
>            Reporter: Rick Lin
>            Assignee: Thomas Groh
>             Fix For: 2.1.0
>
>
> Dear sir,
> The description of waitUntilFinish() is "waits until the pipeline finishes and returns
the final status."
> In my project, a static variable is used to record a PCollection context, where the static
variable is a data list type.
> For this, I considered the "p.run().waitUntilFinish()"  to wait until the pipeline finishes
to avoid the loss of record in the data list.
> Unfortunately, there is a problem that the data list{color:#d04437} *sometimes* {color}may
record the "null" value instead of the realistic value
> In order to clearly explain, i provide my java code in the following.
> {color:#14892c}"import java.io.IOException;
> import java.util.ArrayList;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.Create;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.Mean;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
> import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
> public class BeamTestStatic extends Thread {
>   public static ArrayList<Double> myList = new ArrayList<Double>();
>   public static class StaticTest extends DoFn<Double, Void> {
>     @ProcessElement		 
>    	public void test(ProcessContext c) {
> 		  myList.add(c.element());	
>    	}		
>   }
>  public static void main(String[] args) throws IOException {			
> 	StaticTest testa=new StaticTest();
> 	PipelineOptions options = PipelineOptionsFactory.create();
> 	Pipeline p = Pipeline.create(options);
>   PCollection<Double> data=p.apply("Rawdata",
> Create.of(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,));
> 	PCollection<Void> listtest= data.apply(ParDo.of(testa));
>   p.run().waitUntilFinish();
>   System.out.println("mylist_size_a="+myList.size());
> 		 
>         for (int i = 0; i < myList.size(); i++) {
>         	System.out.println("mylist_data="+myList.get(i));
>         }
> "{color}
> In addition, the result of my code is:
> {color:#205081}"mylist_size_a=10
> mylist_data=null
> mylist_data=4.0
> mylist_data=5.0
> mylist_data=9.0
> mylist_data=6.0
> mylist_data=1.0
> mylist_data=7.0
> mylist_data=8.0
> mylist_data=10.0
> mylist_data=3.0"{color}
> If you have any further information, I am glad to be informed.
> Thanks
> Rick



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message