accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] 01/03: ACCUMULO-4708 Limit RFile block size to 2GB
Date Thu, 09 Nov 2017 21:54:51 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit e875f01ed519100225bdf3a1550c40caccd8521f
Author: Nick <31989480+PircDef@users.noreply.github.com>
AuthorDate: Fri Sep 15 14:42:29 2017 -0400

    ACCUMULO-4708 Limit RFile block size to 2GB
---
 .../accumulo/core/conf/ConfigSanityCheck.java      |  9 +++
 .../org/apache/accumulo/core/file/rfile/RFile.java | 18 +++--
 .../accumulo/core/file/rfile/RFileOperations.java  |  6 ++
 .../accumulo/core/file/rfile/bcfile/BCFile.java    |  7 +-
 .../core/security/crypto/CryptoModule.java         |  1 +
 .../core/security/crypto/DefaultCryptoModule.java  |  3 +-
 .../security/crypto/RFileCipherOutputStream.java   | 86 ++++++++++++++++++++++
 pom.xml                                            |  3 +-
 8 files changed, 122 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java b/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java
index 130863c..9c2ed6c 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java
@@ -21,6 +21,8 @@ import java.util.Map.Entry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * A utility class for validating {@link AccumuloConfiguration} instances.
  */
@@ -66,6 +68,13 @@ public class ConfigSanityCheck {
       if (key.equals(Property.INSTANCE_VOLUMES.getKey())) {
         usingVolumes = value != null && !value.isEmpty();
       }
+
+      // If the block size or block size index is configured to be too large, we throw an
exception to avoid potentially corrupting RFiles later
+      if (key.equals(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey()) || key.equals(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey()))
{
+        long bsize = ConfigurationTypeHelper.getFixedMemoryAsBytes(value);
+        Preconditions.checkArgument(bsize > 0 && bsize < Integer.MAX_VALUE,
key + " must be greater than 0 and less than " + Integer.MAX_VALUE + " but was: "
+            + bsize);
+      }
     }
 
     if (instanceZkTimeoutValue != null) {
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index c1931da..c399a22 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -404,7 +404,7 @@ public class RFile {
     private SampleLocalityGroupWriter sample;
 
     private SummaryStatistics keyLenStats = new SummaryStatistics();
-    private double avergageKeySize = 0;
+    private double averageKeySize = 0;
 
     LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, long maxBlockSize, LocalityGroupMetadata
currentLocalityGroup,
         SampleLocalityGroupWriter sample) {
@@ -441,19 +441,27 @@ public class RFile {
       } else if (blockWriter.getRawSize() > blockSize) {
 
         // Look for a key thats short to put in the index, defining short as average or below.
-        if (avergageKeySize == 0) {
+        if (averageKeySize == 0) {
           // use the same average for the search for a below average key for a block
-          avergageKeySize = keyLenStats.getMean();
+          averageKeySize = keyLenStats.getMean();
         }
 
         // Possibly produce a shorter key that does not exist in data. Even if a key can
be shortened, it may not be below average.
         Key closeKey = KeyShortener.shorten(prevKey, key);
 
-        if ((closeKey.getSize() <= avergageKeySize || blockWriter.getRawSize() > maxBlockSize)
&& !isGiantKey(closeKey)) {
+        if ((closeKey.getSize() <= averageKeySize || blockWriter.getRawSize() > maxBlockSize)
&& !isGiantKey(closeKey)) {
           closeBlock(closeKey, false);
           blockWriter = fileWriter.prepareDataBlock();
           // set average to zero so its recomputed for the next block
-          avergageKeySize = 0;
+          averageKeySize = 0;
+          // To constrain the growth of data blocks, we limit our worst case scenarios to
closing
+          // blocks if they reach the maximum configurable block size of Integer.MAX_VALUE.
+          // 128 bytes added for metadata overhead
+        } else if (((long) key.getSize() + (long) value.getSize() + blockWriter.getRawSize()
+ 128L) >= Integer.MAX_VALUE) {
+          closeBlock(closeKey, false);
+          blockWriter = fileWriter.prepareDataBlock();
+          averageKeySize = 0;
+
         }
       }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 4d1af7e..195da93 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import com.google.common.base.Preconditions;
+
 public class RFileOperations extends FileOperations {
 
   private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
@@ -82,7 +84,11 @@ public class RFileOperations extends FileOperations {
     AccumuloConfiguration acuconf = options.getTableConfiguration();
 
     long blockSize = acuconf.getAsBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
+    Preconditions.checkArgument((blockSize < Integer.MAX_VALUE && blockSize >
0), "table.file.compress.blocksize must be greater than 0 and less than "
+        + Integer.MAX_VALUE);
     long indexBlockSize = acuconf.getAsBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
+    Preconditions.checkArgument((indexBlockSize < Integer.MAX_VALUE && indexBlockSize
> 0),
+        "table.file.compress.blocksize.index must be greater than 0 and less than " + Integer.MAX_VALUE);
 
     SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl.newSamplerConfig(acuconf);
     Sampler sampler = null;
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index 5cfe824..a169619 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -283,12 +283,13 @@ public final class BCFile {
       /**
        * Get the raw size of the block.
        *
+       * Caution: size() comes from DataOutputStream which returns Integer.MAX_VALUE on an
overflow. This results in a value of 2GiB meaning that
+       * an unknown amount of data, at least 2GiB large, has been written. RFiles handle
this issue by keeping track of the position of blocks
+       * instead of relying on blocks to provide this information.
+       *
        * @return the number of uncompressed bytes written through the BlockAppender so far.
        */
       public long getRawSize() throws IOException {
-        /**
-         * Expecting the size() of a block not exceeding 4GB. Assuming the size() will wrap
to negative integer if it exceeds 2GB.
-         */
         return size() & 0x00000000ffffffffL;
       }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java
b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java
index 44531dc..c1962f7 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java
@@ -28,6 +28,7 @@ import javax.crypto.CipherOutputStream;
  * Classes that obey this interface may be used to provide encrypting and decrypting streams
to the rest of Accumulo. Classes that obey this interface may be
  * configured as the crypto module by setting the property crypto.module.class in the accumulo-site.xml
file.
  *
+ * When implementing CryptoModule, it is recommended that any {@link CipherOutputStreams}
uses {@link RFileCipherOutputStream} instead.
  *
  */
 public interface CryptoModule {
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
b/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
index 7609bb0..c5c41cd 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
@@ -30,7 +30,6 @@ import java.util.Map;
 
 import javax.crypto.Cipher;
 import javax.crypto.CipherInputStream;
-import javax.crypto.CipherOutputStream;
 import javax.crypto.spec.IvParameterSpec;
 import javax.crypto.spec.SecretKeySpec;
 
@@ -251,7 +250,7 @@ public class DefaultCryptoModule implements CryptoModule {
       throw new RuntimeException("Encryption cipher must be a block cipher");
     }
 
-    CipherOutputStream cipherOutputStream = new CipherOutputStream(params.getPlaintextOutputStream(),
cipher);
+    RFileCipherOutputStream cipherOutputStream = new RFileCipherOutputStream(params.getPlaintextOutputStream(),
cipher);
     BlockedOutputStream blockedOutputStream = new BlockedOutputStream(cipherOutputStream,
cipher.getBlockSize(), params.getBlockStreamSize());
 
     params.setEncryptedOutputStream(blockedOutputStream);
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/RFileCipherOutputStream.java
b/core/src/main/java/org/apache/accumulo/core/security/crypto/RFileCipherOutputStream.java
new file mode 100644
index 0000000..7dad802
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/RFileCipherOutputStream.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * see the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.security.crypto;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherOutputStream;
+
+/**
+ *
+ * This class extends {@link CipherOutputStream} to include a way to track the number of
bytes that have
+ * been encrypted by the stream. The write method also includes a mechanism to stop writing
and
+ * throw an exception if exceeding a maximum number of bytes is attempted.
+ *
+ */
+public class RFileCipherOutputStream extends CipherOutputStream {
+
+  // This is the maximum size encrypted stream that can be written. Attempting to write anything
larger
+  // will cause an exception. Given that each block in an rfile is encrypted separately,
and blocks
+  // should be written such that a block cannot ever reach 16GiB, this is believed to be
a safe number.
+  // If this does cause an exception, it is an issue best addressed elsewhere.
+  private final long maxOutputSize = 1L << 34; //16GiB
+
+  // The total number of bytes that have been written out
+  private long count = 0;
+
+  /**
+   *
+   * Constructs a RFileCipherOutputStream
+   *
+   * @param os
+   *          the OutputStream object
+   * @param c
+   *          an initialized Cipher object
+   */
+  public RFileCipherOutputStream(OutputStream os, Cipher c) {
+    super(os, c);
+  }
+
+  /**
+   * Override of CipherOutputStream's write to count the number of bytes that have been encrypted.
+   * This method now throws an exception if an attempt to write bytes beyond a maximum is
made.
+   */
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    count += len;
+    if (count > maxOutputSize) {
+      throw new IOException("Attempt to write " + count + " bytes was made. A maximum of
" + maxOutputSize + " is allowed for an encryption stream.");
+    }
+    super.write(b, off, len);
+  }
+
+  @Override
+  public void write(byte b[]) throws IOException {
+    write(b, 0, b.length);
+  }
+
+  /**
+   * Override of CipherOutputStream's write for a single byte to count it. This method now
throws
+   * an exception if an attempt to write bytes beyond a maximum is made.
+   */
+  @Override
+  public void write(int b) throws IOException {
+    count++;
+    if (count > maxOutputSize) {
+      throw new IOException("Attempt to write " + count + " bytes was made. A maximum of
" + maxOutputSize + " is allowed for an encryption stream.");
+    }
+    super.write(b);
+  }
+}
diff --git a/pom.xml b/pom.xml
index deaccf0..2cc3abf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -151,6 +151,7 @@
     <surefire.groups />
     <!-- Thrift version -->
     <thrift.version>0.10.0</thrift.version>
+    <unitTestMemSize>-Xmx1G</unitTestMemSize>
     <!-- ZooKeeper version -->
     <zookeeper.version>3.4.10</zookeeper.version>
   </properties>
@@ -868,7 +869,7 @@
             <systemPropertyVariables>
               <java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
             </systemPropertyVariables>
-            <argLine>-Xmx1G</argLine>
+            <argLine>${unitTestMemSize}</argLine>
           </configuration>
         </plugin>
         <plugin>

-- 
To stop receiving notification emails like this one, please contact
"commits@accumulo.apache.org" <commits@accumulo.apache.org>.

Mime
View raw message