flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
Date Wed, 19 Oct 2016 12:05:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15588571#comment-15588571
] 

ASF GitHub Bot commented on FLINK-4586:
---------------------------------------

Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2639#discussion_r84054277
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
---
    @@ -28,51 +28,52 @@
     public class AverageAccumulator implements SimpleAccumulator<Double> {
     
     	private static final long serialVersionUID = 3672555084179165255L;
    -	
    -	private double localValue;
    +
     	private long count;
     
    +	private double sum;
    +
     	@Override
     	public void add(Double value) {
     		this.count++;
    -		this.localValue += value;
    +		this.sum += value;
     	}
     
     	public void add(double value) {
     		this.count++;
    -		this.localValue += value;
    +		this.sum += value;
     	}
     
     	public void add(long value) {
     		this.count++;
    -		this.localValue += value;
    +		this.sum += value;
     	}
     
     	public void add(int value) {
     		this.count++;
    -		this.localValue += value;
    +		this.sum += value;
     	}
     
     	@Override
     	public Double getLocalValue() {
     		if (this.count == 0) {
     			return 0.0;
     		}
    -		return this.localValue / (double)this.count;
    +		return this.sum / this.count;
     	}
     
     	@Override
     	public void resetLocal() {
     		this.count = 0;
    -		this.localValue = 0;
    +		this.sum = 0;
     	}
     
     	@Override
     	public void merge(Accumulator<Double, Double> other) {
     		if (other instanceof AverageAccumulator) {
    -			AverageAccumulator temp = (AverageAccumulator)other;
    -			this.count += temp.count;
    -			this.localValue += other.getLocalValue();
    --- End diff --
    
    Yes, and the test did not catch the bug since it was only merging the average of two single
values (where the sum is the average).


> NumberSequenceIterator and Accumulator threading issue
> ------------------------------------------------------
>
>                 Key: FLINK-4586
>                 URL: https://issues.apache.org/jira/browse/FLINK-4586
>             Project: Flink
>          Issue Type: Bug
>          Components: DataSet API
>    Affects Versions: 1.1.2
>            Reporter: Johannes
>            Assignee: Greg Hogan
>            Priority: Minor
>             Fix For: 1.2.0, 1.1.4
>
>         Attachments: FLINK4586Test.scala
>
>
> There is a strange problem when using the NumberSequenceIterator in combination with
an AverageAccumulator.
> It seems like the individual accumulators are reinitialized and overwrite parts of intermediate
solutions.
> The following scala snippit exemplifies the problem.
> Instead of printing the correct average, the result should be {{50.5}} but is something
completely different, like {{8.08}}, dependent on the number of cores used.
> If the parallelism is set to {{1}} the result is correct, which indicates a likely threading
problem. 
> The problem occurs using the java and scala API.
> {code}
> env
>   .fromParallelCollection(new NumberSequenceIterator(1, 100))
>   .map(new RichMapFunction[Long, Long] {
> 	var a : AverageAccumulator = _
> 	override def map(value: Long): Long = {
> 	  a.add(value)
> 	  value
> 	}
> 	override def open(parameters: Configuration): Unit = {
> 	  a = new AverageAccumulator
> 	  getRuntimeContext.addAccumulator("test", a)
> 	}
>   })
>   .reduce((a, b) => a + b)
>   .print()
> val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult
> println(lastJobExecutionResult.getAccumulatorResult("test"))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message