spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-11495] Fix potential socket / file handle leaks that were found via static analysis
Date Thu, 19 Nov 2015 00:00:39 GMT
Repository: spark
Updated Branches:
  refs/heads/master c07a50b86 -> 4b1171219


[SPARK-11495] Fix potential socket / file handle leaks that were found via static analysis

The HP Fortify Opens Source Review team (https://www.hpfod.com/open-source-review-project)
reported a handful of potential resource leaks that were discovered using their static analysis
tool. We should fix the issues identified by their scan.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #9455 from JoshRosen/fix-potential-resource-leaks.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b117121
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b117121
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b117121

Branch: refs/heads/master
Commit: 4b117121900e5f242e7c8f46a69164385f0da7cc
Parents: c07a50b
Author: Josh Rosen <joshrosen@databricks.com>
Authored: Wed Nov 18 16:00:35 2015 -0800
Committer: Reynold Xin <rxin@databricks.com>
Committed: Wed Nov 18 16:00:35 2015 -0800

----------------------------------------------------------------------
 .../spark/unsafe/map/BytesToBytesMap.java       |  7 ++++
 .../unsafe/sort/UnsafeSorterSpillReader.java    | 38 ++++++++++++--------
 .../examples/streaming/JavaCustomReceiver.java  | 31 ++++++++--------
 .../network/ChunkFetchIntegrationSuite.java     | 15 +++++---
 .../network/shuffle/TestShuffleDataContext.java | 32 ++++++++++-------
 .../spark/streaming/JavaReceiverAPISuite.java   | 20 +++++++----
 6 files changed, 90 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4b117121/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 04694dc..3387f9a 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.Closeables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -272,6 +273,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
             }
           }
           try {
+            Closeables.close(reader, /* swallowIOException = */ false);
             reader = spillWriters.getFirst().getReader(blockManager);
             recordsInPage = -1;
           } catch (IOException e) {
@@ -318,6 +320,11 @@ public final class BytesToBytesMap extends MemoryConsumer {
         try {
           reader.loadNext();
         } catch (IOException e) {
+          try {
+            reader.close();
+          } catch(IOException e2) {
+            logger.error("Error while closing spill reader", e2);
+          }
           // Scala iterator does not handle exception
           Platform.throwException(e);
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/4b117121/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index 039e940..dcb13e6 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -20,8 +20,7 @@ package org.apache.spark.util.collection.unsafe.sort;
 import java.io.*;
 
 import com.google.common.io.ByteStreams;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.io.Closeables;
 
 import org.apache.spark.storage.BlockId;
 import org.apache.spark.storage.BlockManager;
@@ -31,10 +30,8 @@ import org.apache.spark.unsafe.Platform;
  * Reads spill files written by {@link UnsafeSorterSpillWriter} (see that class for a description
  * of the file format).
  */
-public final class UnsafeSorterSpillReader extends UnsafeSorterIterator {
-  private static final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class);
+public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable
{
 
-  private final File file;
   private InputStream in;
   private DataInputStream din;
 
@@ -52,11 +49,15 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator
{
       File file,
       BlockId blockId) throws IOException {
     assert (file.length() > 0);
-    this.file = file;
     final BufferedInputStream bs = new BufferedInputStream(new FileInputStream(file));
-    this.in = blockManager.wrapForCompression(blockId, bs);
-    this.din = new DataInputStream(this.in);
-    numRecordsRemaining = din.readInt();
+    try {
+      this.in = blockManager.wrapForCompression(blockId, bs);
+      this.din = new DataInputStream(this.in);
+      numRecordsRemaining = din.readInt();
+    } catch (IOException e) {
+      Closeables.close(bs, /* swallowIOException = */ true);
+      throw e;
+    }
   }
 
   @Override
@@ -75,12 +76,7 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator
{
     ByteStreams.readFully(in, arr, 0, recordLength);
     numRecordsRemaining--;
     if (numRecordsRemaining == 0) {
-      in.close();
-      if (!file.delete() && file.exists()) {
-        logger.warn("Unable to delete spill file {}", file.getPath());
-      }
-      in = null;
-      din = null;
+      close();
     }
   }
 
@@ -103,4 +99,16 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator
{
   public long getKeyPrefix() {
     return keyPrefix;
   }
+
+  @Override
+  public void close() throws IOException {
+   if (in != null) {
+     try {
+       in.close();
+     } finally {
+       in = null;
+       din = null;
+     }
+   }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4b117121/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index 99df259..4b50fbf 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -18,6 +18,7 @@
 package org.apache.spark.examples.streaming;
 
 import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.function.FlatMapFunction;
@@ -121,23 +122,23 @@ public class JavaCustomReceiver extends Receiver<String> {
 
   /** Create a socket connection and receive data until receiver is stopped */
   private void receive() {
-    Socket socket = null;
-    String userInput = null;
-
     try {
-      // connect to the server
-      socket = new Socket(host, port);
-
-      BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-
-      // Until stopped or connection broken continue reading
-      while (!isStopped() && (userInput = reader.readLine()) != null) {
-        System.out.println("Received data '" + userInput + "'");
-        store(userInput);
+      Socket socket = null;
+      BufferedReader reader = null;
+      String userInput = null;
+      try {
+        // connect to the server
+        socket = new Socket(host, port);
+        reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+        // Until stopped or connection broken continue reading
+        while (!isStopped() && (userInput = reader.readLine()) != null) {
+          System.out.println("Received data '" + userInput + "'");
+          store(userInput);
+        }
+      } finally {
+        Closeables.close(reader, /* swallowIOException = */ true);
+        Closeables.close(socket,  /* swallowIOException = */ true);
       }
-      reader.close();
-      socket.close();
-
       // Restart in an attempt to connect again when server is active again
       restart("Trying to connect again");
     } catch(ConnectException ce) {

http://git-wip-us.apache.org/repos/asf/spark/blob/4b117121/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
index dc5fa1c..50a324e 100644
--- a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.io.Closeables;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -78,10 +79,15 @@ public class ChunkFetchIntegrationSuite {
     testFile = File.createTempFile("shuffle-test-file", "txt");
     testFile.deleteOnExit();
     RandomAccessFile fp = new RandomAccessFile(testFile, "rw");
-    byte[] fileContent = new byte[1024];
-    new Random().nextBytes(fileContent);
-    fp.write(fileContent);
-    fp.close();
+    boolean shouldSuppressIOException = true;
+    try {
+      byte[] fileContent = new byte[1024];
+      new Random().nextBytes(fileContent);
+      fp.write(fileContent);
+      shouldSuppressIOException = false;
+    } finally {
+      Closeables.close(fp, shouldSuppressIOException);
+    }
 
     final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
@@ -117,6 +123,7 @@ public class ChunkFetchIntegrationSuite {
 
   @AfterClass
   public static void tearDown() {
+    bufferChunk.release();
     server.close();
     clientFactory.close();
     testFile.delete();

http://git-wip-us.apache.org/repos/asf/spark/blob/4b117121/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 3fdde05..7ac1ca1 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -23,6 +23,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import com.google.common.io.Closeables;
 import com.google.common.io.Files;
 
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
@@ -60,21 +61,28 @@ public class TestShuffleDataContext {
   public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) throws IOException
{
     String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0";
 
-    OutputStream dataStream = new FileOutputStream(
-      ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
-    DataOutputStream indexStream = new DataOutputStream(new FileOutputStream(
-      ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
+    OutputStream dataStream = null;
+    DataOutputStream indexStream = null;
+    boolean suppressExceptionsDuringClose = true;
 
-    long offset = 0;
-    indexStream.writeLong(offset);
-    for (byte[] block : blocks) {
-      offset += block.length;
-      dataStream.write(block);
+    try {
+      dataStream = new FileOutputStream(
+        ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
+      indexStream = new DataOutputStream(new FileOutputStream(
+        ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
+
+      long offset = 0;
       indexStream.writeLong(offset);
+      for (byte[] block : blocks) {
+        offset += block.length;
+        dataStream.write(block);
+        indexStream.writeLong(offset);
+      }
+      suppressExceptionsDuringClose = false;
+    } finally {
+      Closeables.close(dataStream, suppressExceptionsDuringClose);
+      Closeables.close(indexStream, suppressExceptionsDuringClose);
     }
-
-    dataStream.close();
-    indexStream.close();
   }
 
   /** Creates reducer blocks in a hash-based data format within our local dirs. */

http://git-wip-us.apache.org/repos/asf/spark/blob/4b117121/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
index ec2bffd..7a8ef9d 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -23,6 +23,7 @@ import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import static org.junit.Assert.*;
 
+import com.google.common.io.Closeables;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -121,14 +122,19 @@ public class JavaReceiverAPISuite implements Serializable {
 
     private void receive() {
       try {
-        Socket socket = new Socket(host, port);
-        BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-        String userInput;
-        while ((userInput = in.readLine()) != null) {
-          store(userInput);
+        Socket socket = null;
+        BufferedReader in = null;
+        try {
+          socket = new Socket(host, port);
+          in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+          String userInput;
+          while ((userInput = in.readLine()) != null) {
+            store(userInput);
+          }
+        } finally {
+          Closeables.close(in, /* swallowIOException = */ true);
+          Closeables.close(socket,  /* swallowIOException = */ true);
         }
-        in.close();
-        socket.close();
       } catch(ConnectException ce) {
         ce.printStackTrace();
         restart("Could not connect", ce);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message