accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [06/45] accumulo git commit: ACCUMULO-4391 Added appropriate synchronization to allow RFile.Reader deepcopies to be used in separate threads.
Date Fri, 09 Sep 2016 23:37:25 GMT
ACCUMULO-4391 Added appropriate synchronization to allow RFile.Reader deepcopies to be used
in separate threads.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6ff35f0f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6ff35f0f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6ff35f0f

Branch: refs/heads/1.6
Commit: 6ff35f0f013497871df22acc842341ea7163a5ab
Parents: 5957c22
Author: Ivan Bella <ivan@bella.name>
Authored: Mon Aug 1 20:30:53 2016 -0400
Committer: Ivan Bella <ivan@bella.name>
Committed: Mon Aug 1 20:30:53 2016 -0400

----------------------------------------------------------------------
 .../file/blockfile/impl/CachableBlockFile.java  |  5 +++-
 .../accumulo/core/file/rfile/bcfile/BCFile.java | 25 ++++++++++++----
 .../bcfile/BoundedRangeFileInputStream.java     | 30 +++++++++++---------
 3 files changed, 40 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/6ff35f0f/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 3b12d07..be464be 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -417,7 +417,10 @@ public class CachableBlockFile {
         _bc.close();
 
       if (fin != null) {
-        fin.close();
+        // synchronize on the FSDataInputStream to ensure thread safety with the BoundedRangeFileInputStream
+        synchronized (fin) {
+          fin.close();
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6ff35f0f/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index 4767d91..b8cd639 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -614,7 +614,7 @@ public final class BCFile {
       private final Algorithm compressAlgo;
       private Decompressor decompressor;
       private final BlockRegion region;
-      private final InputStream in;
+      private InputStream in;
 
       public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin, BlockRegion region,
Configuration conf, CryptoModule cryptoModule,
           Version bcFileVersion, CryptoModuleParameters cryptoParams) throws IOException
{
@@ -672,11 +672,24 @@ public final class BCFile {
       }
 
       public void finish() throws IOException {
-        try {
-          in.close();
-        } finally {
-          compressAlgo.returnDecompressor(decompressor);
-          decompressor = null;
+        final InputStream inLocal = in;
+        if (inLocal != null) {
+          synchronized (inLocal) {
+            if (in != null) {
+              try {
+                in.close();
+              } finally {
+                in = null;
+                if (decompressor != null) {
+                  try {
+                    compressAlgo.returnDecompressor(decompressor);
+                  } finally {
+                    decompressor = null;
+                  }
+                }
+              }
+            }
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6ff35f0f/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
index 285040f..b182b26 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
@@ -60,17 +60,15 @@ class BoundedRangeFileInputStream extends InputStream {
     this.mark = -1;
   }
 
+  private void check() throws IOException {
+    if (in == null) {
+      throw new IOException("Stream closed");
+    }
+  }
+
   @Override
   public int available() throws IOException {
-    final FSDataInputStream inLocal = in;
-    synchronized (inLocal) {
-      int avail = inLocal.available();
-      if (pos + avail > end) {
-        avail = (int) (end - pos);
-      }
-
-      return avail;
-    }
+    return (int) (end - pos);
   }
 
   @Override
@@ -97,7 +95,9 @@ class BoundedRangeFileInputStream extends InputStream {
       return -1;
     Integer ret = 0;
     final FSDataInputStream inLocal = in;
+    check(); // ensuring inLocal is not null
     synchronized (inLocal) {
+      check(); // ensuring in is not null in which case we were closed which would be followed
by someone else reusing the decompressor
       inLocal.seek(pos);
       try {
         ret = AccessController.doPrivileged(new PrivilegedExceptionAction<Integer>()
{
@@ -149,9 +149,13 @@ class BoundedRangeFileInputStream extends InputStream {
 
   @Override
   public void close() {
-    // Invalidate the state of the stream.
-    in = null;
-    pos = end;
-    mark = -1;
+    final FSDataInputStream inLocal = in;
+    if (inLocal != null) {
+      // synchronize on the FSDataInputStream to ensure we block closing if in the read method
+      synchronized (inLocal) {
+        // Invalidate the state of the stream.
+        in = null;
+      }
+    }
   }
 }


Mime
View raw message