nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From won...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-39] SonarCloud Bugs and Vulnerabilities for RuntimeExecutor
Date Sat, 24 Mar 2018 04:08:29 GMT
This is an automated email from the ASF dual-hosted git repository.

wonook pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 46a2dcc  [NEMO-39] SonarCloud Bugs and Vulnerabilities for RuntimeExecutor
46a2dcc is described below

commit 46a2dcc0ab666f2a78a4dff4944ff793d95c2aeb
Author: Sanha Lee <sanhaleehana@gmail.com>
AuthorDate: Sat Mar 24 13:08:27 2018 +0900

    [NEMO-39] SonarCloud Bugs and Vulnerabilities for RuntimeExecutor
    
    JIRA: [NEMO-39](https://issues.apache.org/jira/browse/NEMO-39)
    
    **Major changes:**
    - Minor changes to meet SonarCloud criteria (including interrupting threads when `InterruptedException`
occurs).
    
    **Minor changes to note:**
    - None
    
    **Tests for the changes:**
    - Existing tests cover the changes.
    
    **Other comments:**
    - None
    
    resolves [NEMO-39](https://issues.apache.org/jira/browse/NEMO-39)
---
 .../edu/snu/nemo/runtime/executor/TaskGroupExecutor.java   |  9 ++++++++-
 .../runtime/executor/bytetransfer/ByteInputContext.java    | 14 +++++++++++++-
 .../nemo/runtime/executor/bytetransfer/ByteTransport.java  |  1 +
 .../snu/nemo/runtime/executor/data/BlockManagerWorker.java | 14 +++++++++++---
 4 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
index 89284e2..2c49300 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
@@ -207,11 +207,13 @@ public final class TaskGroupExecutor {
             Optional.of(TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE));
         LOG.warn("{} Execution Failed (Recoverable)! Exception: {}",
             new Object[] {taskGroupId, ex.toString()});
+        Thread.currentThread().interrupt();
       } catch (final BlockWriteException ex2) {
         taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.FAILED_RECOVERABLE,
             Optional.of(TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
         LOG.warn("{} Execution Failed (Recoverable)! Exception: {}",
             new Object[] {taskGroupId, ex2.toString()});
+        Thread.currentThread().interrupt();
       } catch (final Exception e) {
         taskGroupStateManager.onTaskStateChanged(
             physicalTaskId, TaskState.State.FAILED_UNRECOVERABLE, Optional.empty());
@@ -290,7 +292,10 @@ public final class TaskGroupExecutor {
             }
             sideInputMap.put(srcTransform, sideInput);
             sideInputIterators.add(sideInputIterator);
-          } catch (final InterruptedException | ExecutionException e) {
+          } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new BlockFetchException(e);
+          } catch (final ExecutionException e) {
             throw new BlockFetchException(e);
           }
         });
@@ -347,6 +352,7 @@ public final class TaskGroupExecutor {
           }
         }
       } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
         throw new BlockFetchException(e);
       }
 
@@ -424,6 +430,7 @@ public final class TaskGroupExecutor {
           }
         }
       } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
         throw new BlockFetchException(e);
       }
     }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteInputContext.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteInputContext.java
index e3dc30e..a2fcd5e 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteInputContext.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteInputContext.java
@@ -16,11 +16,14 @@
 package edu.snu.nemo.runtime.executor.bytetransfer;
 
 import io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -34,6 +37,8 @@ import java.util.concurrent.CompletableFuture;
  */
 public final class ByteInputContext extends ByteTransferContext {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ByteInputContext.class.getName());
+
   private final CompletableFuture<Iterator<InputStream>> completedFuture = new
CompletableFuture<>();
   private final ClosableBlockingQueue<ByteBufInputStream> byteBufInputStreams = new
ClosableBlockingQueue<>();
   private volatile ByteBufInputStream currentByteBufInputStream = null;
@@ -44,6 +49,7 @@ public final class ByteInputContext extends ByteTransferContext {
       try {
         return byteBufInputStreams.peek() != null;
       } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
         throw new RuntimeException(e);
       }
     }
@@ -53,7 +59,9 @@ public final class ByteInputContext extends ByteTransferContext {
       try {
         return byteBufInputStreams.take();
       } catch (final InterruptedException e) {
-        throw new RuntimeException(e);
+        Thread.currentThread().interrupt();
+        LOG.error("Interrupted while taking byte buf.", e);
+        throw new NoSuchElementException();
       }
     }
   };
@@ -162,6 +170,7 @@ public final class ByteInputContext extends ByteTransferContext {
         }
         return b;
       } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
         throw new IOException(e);
       }
     }
@@ -196,6 +205,7 @@ public final class ByteInputContext extends ByteTransferContext {
         }
         return readBytes;
       } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
         throw new IOException(e);
       }
     }
@@ -230,6 +240,7 @@ public final class ByteInputContext extends ByteTransferContext {
         }
         return skippedBytes;
       } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
         throw new IOException(e);
       }
     }
@@ -244,6 +255,7 @@ public final class ByteInputContext extends ByteTransferContext {
           return head.readableBytes();
         }
       } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
         throw new IOException(e);
       }
     }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
index 0f7eae2..fa5116b 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
@@ -125,6 +125,7 @@ final class ByteTransport implements AutoCloseable {
             break;
           }
         } catch (final InterruptedException e) {
+          Thread.currentThread().interrupt();
           LOG.debug(String.format("Interrupted while binding to %s:%d", host, candidatePort),
e);
         }
       }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index d108a19..f340ab0 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -409,13 +409,21 @@ public final class BlockManagerWorker {
               || DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
             final FileStore fileStore = (FileStore) getBlockStore(blockStore);
             for (final FileArea fileArea : fileStore.getFileAreas(blockId, keyRange)) {
-              outputContext.newOutputStream().writeFileArea(fileArea).close();
+              try (final ByteOutputContext.ByteOutputStream stream = outputContext.newOutputStream())
{
+                stream.writeFileArea(fileArea);
+              }
             }
           } else {
             final Optional<Iterable<SerializedPartition>> optionalResult = getBlockStore(blockStore)
                 .getSerializedPartitions(blockId, keyRange);
-            for (final SerializedPartition partition : optionalResult.get()) {
-              outputContext.newOutputStream().writeSerializedPartition(partition).close();
+            if (optionalResult.isPresent()) {
+              for (final SerializedPartition partition : optionalResult.get()) {
+                try (final ByteOutputContext.ByteOutputStream stream = outputContext.newOutputStream())
{
+                  stream.writeSerializedPartition(partition);
+                }
+              }
+            } else {
+              throw new IOException("Block is not found!");
             }
           }
           handleUsedData(blockStore, blockId);

-- 
To stop receiving notification emails like this one, please contact
wonook@apache.org.

Mime
View raw message