accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [14/45] accumulo git commit: ACCUMJULO-4391 Resolved the remaining pull request comments.
Date Fri, 09 Sep 2016 23:37:33 GMT
ACCUMJULO-4391 Resolved the remaining pull request comments.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/feb2a51c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/feb2a51c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/feb2a51c

Branch: refs/heads/1.8
Commit: feb2a51cf6b4cbccd181f541b1929bf3fc166b8d
Parents: 9038d8b
Author: Ivan Bella <ivan@bella.name>
Authored: Thu Sep 8 12:09:15 2016 -0400
Committer: Ivan Bella <ivan@bella.name>
Committed: Thu Sep 8 12:09:15 2016 -0400

----------------------------------------------------------------------
 .../accumulo/core/file/rfile/bcfile/BCFile.java       | 13 +++++++------
 .../rfile/bcfile/BoundedRangeFileInputStream.java     | 14 +++++++++-----
 .../core/file/rfile/MultiThreadedRFileTest.java       | 11 +++--------
 .../apache/accumulo/core/file/rfile/RFileTest.java    |  7 -------
 core/src/test/resources/log4j.properties              |  2 ++
 5 files changed, 21 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/feb2a51c/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index b8cd639..60376d5 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -614,7 +614,8 @@ public final class BCFile {
       private final Algorithm compressAlgo;
       private Decompressor decompressor;
       private final BlockRegion region;
-      private InputStream in;
+      private final InputStream in;
+      private volatile boolean closed;
 
       public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin, BlockRegion region,
Configuration conf, CryptoModule cryptoModule,
           Version bcFileVersion, CryptoModuleParameters cryptoParams) throws IOException
{
@@ -652,6 +653,7 @@ public final class BCFile {
           compressAlgo.returnDecompressor(decompressor);
           throw e;
         }
+        closed = false;
       }
 
       /**
@@ -672,14 +674,13 @@ public final class BCFile {
       }
 
       public void finish() throws IOException {
-        final InputStream inLocal = in;
-        if (inLocal != null) {
-          synchronized (inLocal) {
-            if (in != null) {
+        if (!closed) {
+          synchronized (in) {
+            if (!closed) {
               try {
                 in.close();
               } finally {
-                in = null;
+                closed = true;
                 if (decompressor != null) {
                   try {
                     compressAlgo.returnDecompressor(decompressor);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/feb2a51c/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
index 876b585..b5ee61b 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 class BoundedRangeFileInputStream extends InputStream {
 
   private volatile boolean closed = false;
-  private FSDataInputStream in;
+  private final FSDataInputStream in;
   private long pos;
   private long end;
   private long mark;
@@ -145,10 +145,14 @@ class BoundedRangeFileInputStream extends InputStream {
 
   @Override
   public void close() {
-    // synchronize on the FSDataInputStream to ensure we are blocked if in the read method
-    synchronized (in) {
-      // Invalidate the state of the stream.
-      closed = true;
+    // Synchronize on the FSDataInputStream to ensure we are blocked if in the read method:
+    // Once this close completes, the underlying decompression stream may be returned to
+    // the pool and subsequently used. Turns out this is a problem if currently using it
to read.
+    if (!closed) {
+      synchronized (in) {
+        // Invalidate the state of the stream.
+        closed = true;
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/feb2a51c/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
index 662bb1b..498551f 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.junit.Rule;
 import org.junit.Test;
@@ -61,16 +60,12 @@ import static org.junit.Assert.assertTrue;
 
 public class MultiThreadedRFileTest {
 
+  private static final Logger LOG = Logger.getLogger(MultiThreadedRFileTest.class);
   private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<ByteSequence>();
 
   @Rule
   public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir")
+ "/target"));
 
-  static {
-    Logger.getLogger(org.apache.hadoop.io.compress.CodecPool.class).setLevel(Level.OFF);
-    Logger.getLogger(org.apache.hadoop.util.NativeCodeLoader.class).setLevel(Level.OFF);
-  }
-
   private static void checkIndex(Reader reader) throws IOException {
     FileSKVIterator indexIter = reader.getIndex();
 
@@ -269,8 +264,8 @@ public class MultiThreadedRFileTest {
     }
 
     for (String message : messages.keySet()) {
-      System.out.println(messages.get(message) + ": " + message);
-      System.out.println(stackTrace.get(message));
+      LOG.error(messages.get(message) + ": " + message);
+      LOG.error(stackTrace.get(message));
     }
 
     assertTrue(threadExceptions.isEmpty());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/feb2a51c/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index 6a29610..c345065 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -67,8 +67,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.Text;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -82,11 +80,6 @@ public class RFileTest {
   @Rule
   public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir")
+ "/target"));
 
-  static {
-    Logger.getLogger(org.apache.hadoop.io.compress.CodecPool.class).setLevel(Level.OFF);
-    Logger.getLogger(org.apache.hadoop.util.NativeCodeLoader.class).setLevel(Level.OFF);
-  }
-
   static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable,
PositionedReadable {
 
     public SeekableByteArrayInputStream(byte[] buf) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/feb2a51c/core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties
index 9f968f8..40adebf 100644
--- a/core/src/test/resources/log4j.properties
+++ b/core/src/test/resources/log4j.properties
@@ -25,4 +25,6 @@ log4j.logger.org.apache.commons.vfs2.impl.DefaultFileSystemManager=WARN
 log4j.logger.org.apache.hadoop.mapred=ERROR
 log4j.logger.org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter=ERROR
 log4j.logger.org.apache.hadoop.util.ProcessTree=ERROR
+log4j.logger.org.apache.hadoop.io.compress.CodecPool=FATAL
+log4j.logger.org.apache.hadoop.util.NativeCodeLoader=FATAL
 log4j.logger.org.apache.accumulo.core.util.format=FATAL


Mime
View raw message