flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From NicoK <...@git.apache.org>
Subject [GitHub] flink pull request #6272: [FLINK-9755][network] forward exceptions in Remote...
Date Thu, 12 Jul 2018 09:31:53 GMT
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6272#discussion_r201969455
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
---
    @@ -277,37 +277,17 @@ public void recycle(MemorySegment segment) {
     		// We do not know which locks have been acquired before the recycle() or are needed
in the
     		// notification and which other threads also access them.
     		// -> call notifyBufferAvailable() outside of the synchronized block to avoid a
deadlock (FLINK-9676)
    -		boolean success = false;
    -		boolean needMoreBuffers = false;
    -		try {
    -			needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    -			success = true;
    -		} catch (Throwable ignored) {
    -			// handled below, under the lock
    -		}
    +		// Note that in case of any exceptions notifyBufferAvailable() should recycle the buffer
and
    +		// therefore end up in this method again.
    +		needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
     
    -		if (!success || needMoreBuffers) {
    +		if (needMoreBuffers) {
     			synchronized (availableMemorySegments) {
     				if (isDestroyed) {
     					// cleanup tasks how they would have been done if we only had one synchronized block
    -					if (needMoreBuffers) {
    -						listener.notifyBufferDestroyed();
    -					}
    -					if (!success) {
    -						returnMemorySegment(segment);
    -					}
    +					listener.notifyBufferDestroyed();
     				} else {
    -					if (needMoreBuffers) {
    -						registeredListeners.add(listener);
    -					}
    -					if (!success) {
    -						if (numberOfRequestedMemorySegments > currentPoolSize) {
    -							returnMemorySegment(segment);
    -						} else {
    -							availableMemorySegments.add(segment);
    -							availableMemorySegments.notify();
    -						}
    -					}
    +					registeredListeners.add(listener);
    --- End diff --
    
    that's how it always was:
    a) if the exception was caught in the `notifyBufferAvailable()` handler, `needMoreBuffers`
is `false` and we expect the error handling in that method to fail the logic that the thread
waiting on the listener is relying on - otherwise this thread's error handling needs to re-register.
    
    b) if `notifyBufferAvailable()` throws (this is not allowed anymore!), previous logic
 also did not re-add the listener


---

Mime
View raw message