accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject git commit: ACCUMULO-1998
Date Wed, 08 Jan 2014 20:57:02 GMT
Updated Branches:
  refs/heads/1.6.0-SNAPSHOT 133e90a3c -> 443cba7a7


ACCUMULO-1998

All encrypted walog events are now individual blocked on disk. This leads to an additional maxBlockSize parameter (mostly to handle OOM from mismatched crypto). Additionally, because of this behavior, as well as PKCS5 behavior, I have turned off all padding on the default crypto configs and padding should not be used as it can cause data loss in walogs. I have hammered 5 instances on and off every minute for 22 hours and counting with no related issues, so I deem it a fix.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/443cba7a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/443cba7a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/443cba7a

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 443cba7a7a3838b547880f1c49f2a9e0128692cd
Parents: 133e90a
Author: John Vines <vines@apache.org>
Authored: Wed Jan 8 15:51:03 2014 -0500
Committer: John Vines <vines@apache.org>
Committed: Wed Jan 8 15:51:03 2014 -0500

----------------------------------------------------------------------
 conf/examples/crypto/accumulo-site.xml          |   2 +-
 .../org/apache/accumulo/core/conf/Property.java |   3 +
 .../accumulo/core/file/rfile/bcfile/BCFile.java |   9 +-
 .../security/crypto/BlockedInputStream.java     | 174 +++++++++++++++++++
 .../security/crypto/BlockedOutputStream.java    |  96 ++++++++++
 .../security/crypto/CryptoModuleFactory.java    | 111 ++++++------
 .../security/crypto/CryptoModuleParameters.java |  20 ++-
 .../security/crypto/DefaultCryptoModule.java    |  26 ++-
 .../security/crypto/NoFlushOutputStream.java    |  31 ++++
 .../crypto-on-accumulo-site.xml                 | 162 -----------------
 ...rypto-on-no-key-encryption-accumulo-site.xml | 144 ---------------
 .../security/crypto/BlockedIOStreamTest.java    |  74 ++++++++
 .../core/security/crypto/CryptoTest.java        |   4 +-
 .../test/resources/crypto-on-accumulo-site.xml  |   2 +-
 ...rypto-on-no-key-encryption-accumulo-site.xml |   2 +-
 .../apache/accumulo/tserver/log/DfsLogger.java  |  13 +-
 16 files changed, 484 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/conf/examples/crypto/accumulo-site.xml
----------------------------------------------------------------------
diff --git a/conf/examples/crypto/accumulo-site.xml b/conf/examples/crypto/accumulo-site.xml
index 05181e8..ca47f9a 100644
--- a/conf/examples/crypto/accumulo-site.xml
+++ b/conf/examples/crypto/accumulo-site.xml
@@ -125,7 +125,7 @@
     </property>
     <property>
       <name>crypto.cipher.suite</name>
-      <value>AES/CFB/PKCS5Padding</value>
+      <value>AES/CFB/NoPadding</value>
     </property>
     <property>
       <name>crypto.cipher.algorithm.name</name>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 2ab3a20..6c4b1fe 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -50,6 +50,9 @@ public enum Property {
   CRYPTO_CIPHER_ALGORITHM_NAME("crypto.cipher.algorithm.name", "NullCipher", PropertyType.STRING,
       "States the name of the algorithm used in the corresponding cipher suite. Do not make these different, unless you enjoy mysterious exceptions and bugs."),
   @Experimental
+  CRYPTO_BLOCK_STREAM_SIZE("crypto.block.stream.size", "1K", PropertyType.MEMORY,
+      "The size of the buffer above the cipher stream. Used for reading files and padding walog entries."),
+  @Experimental
   CRYPTO_CIPHER_KEY_LENGTH("crypto.cipher.key.length", "128", PropertyType.STRING,
       "Specifies the key length *in bits* to use for the symmetric key, should probably be 128 or 256 unless you really know what you're doing"),
   @Experimental

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index 7ce6c2a..f91ac59 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -90,7 +90,6 @@ public final class BCFile {
     private final FSDataOutputStream out;
     private final Configuration conf;
     private final CryptoModule cryptoModule;
-    private final Map<String,String> cryptoConf;
     private BCFileCryptoModuleParameters cryptoParams;
     private SecretKeyEncryptionStrategy secretKeyEncryptionStrategy;
     // the single meta block containing index of compressed data blocks
@@ -363,16 +362,10 @@ public final class BCFile {
 
       @SuppressWarnings("deprecation")
       AccumuloConfiguration accumuloConfiguration = AccumuloConfiguration.getSiteConfiguration();
-      this.cryptoConf = accumuloConfiguration.getAllPropertiesWithPrefix(Property.CRYPTO_PREFIX);
 
       this.cryptoModule = CryptoModuleFactory.getCryptoModule(accumuloConfiguration);
-      Map<String,String> instanceProperties = accumuloConfiguration.getAllPropertiesWithPrefix(Property.INSTANCE_PREFIX);
-      if (instanceProperties != null) {
-        this.cryptoConf.putAll(instanceProperties);
-      }
-
       this.cryptoParams = new BCFileCryptoModuleParameters();
-      CryptoModuleFactory.fillParamsObjectFromStringMap(cryptoParams, cryptoConf);
+      CryptoModuleFactory.fillParamsObjectFromConfiguration(cryptoParams, accumuloConfiguration);
       this.cryptoParams = (BCFileCryptoModuleParameters) cryptoModule.generateNewRandomSessionKey(cryptoParams);
 
       this.secretKeyEncryptionStrategy = CryptoModuleFactory.getSecretKeyEncryptionStrategy(accumuloConfiguration);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedInputStream.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedInputStream.java
new file mode 100644
index 0000000..7b8ec9e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedInputStream.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.security.crypto;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Reader corresponding to BlockedOutputStream. Expects all data to be in the form of size (int) data (size bytes) junk (however many bytes it takes to complete
+ * a block)
+ */
+public class BlockedInputStream extends InputStream {
+  byte[] array;
+  // ReadPos is where to start reading
+  // WritePos is the last position written to
+  int readPos, writePos;
+  DataInputStream in;
+  int blockSize;
+  boolean finished = false;
+
+  public BlockedInputStream(InputStream in, int blockSize, int maxSize) {
+    if (blockSize == 0)
+      throw new RuntimeException("Invalid block size");
+    if (in instanceof DataInputStream)
+      this.in = (DataInputStream) in;
+    else
+      this.in = new DataInputStream(in);
+
+    array = new byte[maxSize];
+    readPos = 0;
+    writePos = -1;
+
+    this.blockSize = blockSize;
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (remaining() > 0)
+      return (array[readAndIncrement(1)] & 0xFF);
+    return -1;
+  }
+
+  private int readAndIncrement(int toAdd) {
+    int toRet = readPos;
+    readPos += toAdd;
+    if (readPos == array.length)
+      readPos = 0;
+    else if (readPos > array.length)
+      throw new RuntimeException("Unexpected state, this should only ever increase or cycle on the boundry!");
+    return toRet;
+  }
+
+  @Override
+  public int read(byte b[], int off, int len) throws IOException {
+    int toCopy = Math.min(len, remaining());
+    if (toCopy > 0) {
+      System.arraycopy(array, readPos, b, off, toCopy);
+      readAndIncrement(toCopy);
+    }
+    return toCopy;
+  }
+
+  private int remaining() throws IOException {
+    if (finished)
+      return -1;
+    if (available() == 0)
+      refill();
+
+    return available();
+  }
+
+  // Amount available to read
+  public int available() {
+    int toRet = writePos + 1 - readPos;
+    if (toRet < 0)
+      toRet += array.length;
+    return Math.min(array.length - readPos, toRet);
+  }
+
+  private boolean refill() throws IOException {
+    if (finished)
+      return false;
+    int size;
+    try {
+      size = in.readInt();
+    } catch (EOFException eof) {
+      finished = true;
+      return false;
+    }
+
+    // Shortcut for if we're reading garbage data
+    if (size < 0 || size > array.length) {
+      finished = true;
+      return false;
+    } else if (size == 0)
+      throw new RuntimeException("Empty block written, this shouldn't happen with this BlockedOutputStream.");
+
+    // We have already checked, not concerned with looping the buffer here
+    int bufferAvailable = array.length - readPos;
+    if (size > bufferAvailable) {
+      in.readFully(array, writePos + 1, bufferAvailable);
+      in.readFully(array, 0, size - bufferAvailable);
+    } else {
+      in.readFully(array, writePos + 1, size);
+    }
+    writePos += size;
+    if (writePos >= array.length - 1)
+      writePos -= array.length;
+
+    // Skip the cruft
+    int remainder = blockSize - ((size + 4) % blockSize);
+    if (remainder != blockSize) {
+      // If remainder isn't spilling the rest of the block, we know it's incomplete.
+      if (in.available() < remainder) {
+        undoWrite(size);
+        return false;
+      }
+      in.skip(remainder);
+    }
+
+    return true;
+  }
+
+  private void undoWrite(int size) {
+    writePos = writePos - size;
+    if (writePos < -1)
+      writePos += array.length;
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    throw new UnsupportedOperationException();
+    // available(n);
+    // bb.position(bb.position()+(int)n);
+  }
+
+  @Override
+  public void close() throws IOException {
+    array = null;
+    in.close();
+  }
+
+  @Override
+  public synchronized void mark(int readlimit) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public synchronized void reset() throws IOException {
+    in.reset();
+    readPos = 0;
+    writePos = -1;
+  }
+
+  public boolean markSupported() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
new file mode 100644
index 0000000..9ca00b7
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.security.crypto;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+// Buffers all input in a growing buffer until flush() is called. Then entire buffer is written, with size information, and padding to force the underlying 
+// crypto output stream to also fully flush
+public class BlockedOutputStream extends OutputStream {
+  int blockSize;
+  DataOutputStream out;
+  ByteBuffer bb;
+
+  public BlockedOutputStream(OutputStream out, int blockSize, int bufferSize) {
+    if (bufferSize <= 0)
+      throw new IllegalArgumentException("bufferSize must be greater than 0.");
+    if (out instanceof DataOutputStream)
+      this.out = (DataOutputStream) out;
+    else
+      this.out = new DataOutputStream(out);
+    this.blockSize = blockSize;
+    int remainder = bufferSize % blockSize;
+    if (remainder != 0)
+      remainder = blockSize - remainder;
+    // some buffer space + bytes to make the buffer evened up with the cipher block size - 4 bytes for the size int
+    bb = ByteBuffer.allocate(bufferSize + remainder - 4);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    int size = bb.position();
+    if (size == 0)
+      return;
+    out.writeInt(size);
+
+    int remainder = ((size + 4) % blockSize);
+    if (remainder != 0)
+      remainder = blockSize - remainder;
+
+    // This is garbage
+    bb.position(bb.position() + remainder);
+    out.write(bb.array(), 0, size + remainder);
+
+    out.flush();
+    bb.rewind();
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    bb.put((byte) b);
+    if (bb.remaining() == 0)
+      flush();
+  }
+
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    if (bb.remaining() >= len) {
+      bb.put(b, off, len);
+      if (bb.remaining() == 0)
+        flush();
+    } else {
+      int remaining = bb.remaining();
+      write(b, off, remaining);
+      write(b, off + remaining, len - remaining);
+    }
+  }
+
+  @Override
+  public void write(byte b[]) throws IOException {
+    write(b, 0, b.length);
+  }
+
+  @Override
+  public void close() throws IOException {
+    flush();
+    bb = null;
+    out.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
index 547cb8c..65acc6b 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
@@ -31,11 +31,11 @@ import org.apache.log4j.Logger;
  * 
  */
 public class CryptoModuleFactory {
-  
+
   private static Logger log = Logger.getLogger(CryptoModuleFactory.class);
   private static Map<String,CryptoModule> cryptoModulesCache = new HashMap<String,CryptoModule>();
   private static Map<String,SecretKeyEncryptionStrategy> secretKeyEncryptionStrategyCache = new HashMap<String,SecretKeyEncryptionStrategy>();
-  
+
   /**
    * This method returns a crypto module based on settings in the given configuration parameter.
    * 
@@ -47,17 +47,17 @@ public class CryptoModuleFactory {
     String cryptoModuleClassname = conf.get(Property.CRYPTO_MODULE_CLASS);
     return getCryptoModule(cryptoModuleClassname);
   }
-  
+
   public static CryptoModule getCryptoModule(String cryptoModuleClassname) {
-    
+
     if (cryptoModuleClassname != null) {
       cryptoModuleClassname = cryptoModuleClassname.trim();
     }
-    
+
     if (cryptoModuleClassname == null || cryptoModuleClassname.equals("NullCryptoModule")) {
       return new NullCryptoModule();
     }
-    
+
     CryptoModule cryptoModule = null;
     synchronized (cryptoModulesCache) {
       if (cryptoModulesCache.containsKey(cryptoModuleClassname)) {
@@ -67,14 +67,14 @@ public class CryptoModuleFactory {
         cryptoModulesCache.put(cryptoModuleClassname, cryptoModule);
       }
     }
-    
+
     return cryptoModule;
   }
-  
+
   @SuppressWarnings({"rawtypes"})
   private static CryptoModule instantiateCryptoModule(String cryptoModuleClassname) {
     log.debug(String.format("About to instantiate crypto module %s", cryptoModuleClassname));
-    
+
     CryptoModule cryptoModule = null;
     Class cryptoModuleClazz = null;
     try {
@@ -83,27 +83,27 @@ public class CryptoModuleFactory {
       log.warn(String.format("Could not find configured crypto module \"%s\".  No encryption will be used.", cryptoModuleClassname));
       return new NullCryptoModule();
     }
-    
+
     // Check if the given class implements the CryptoModule interface
     Class[] interfaces = cryptoModuleClazz.getInterfaces();
     boolean implementsCryptoModule = false;
-    
+
     for (Class clazz : interfaces) {
       if (clazz.equals(CryptoModule.class)) {
         implementsCryptoModule = true;
         break;
       }
     }
-    
+
     if (!implementsCryptoModule) {
       log.warn("Configured Accumulo crypto module \"" + cryptoModuleClassname + "\" does not implement the CryptoModule interface. No encryption will be used.");
       return new NullCryptoModule();
     } else {
       try {
         cryptoModule = (CryptoModule) cryptoModuleClazz.newInstance();
-        
+
         log.debug("Successfully instantiated crypto module " + cryptoModuleClassname);
-        
+
       } catch (InstantiationException e) {
         log.warn(String.format("Got instantiation exception %s when instantiating crypto module \"%s\".  No encryption will be used.", e.getCause().getClass()
             .getName(), cryptoModuleClassname));
@@ -118,22 +118,22 @@ public class CryptoModuleFactory {
     }
     return cryptoModule;
   }
-  
+
   public static SecretKeyEncryptionStrategy getSecretKeyEncryptionStrategy(AccumuloConfiguration conf) {
     String className = conf.get(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS);
     return getSecretKeyEncryptionStrategy(className);
   }
-  
+
   public static SecretKeyEncryptionStrategy getSecretKeyEncryptionStrategy(String className) {
-    
+
     if (className != null) {
       className = className.trim();
     }
-    
+
     if (className == null || className.equals("NullSecretKeyEncryptionStrategy")) {
       return new NullSecretKeyEncryptionStrategy();
     }
-    
+
     SecretKeyEncryptionStrategy strategy = null;
     synchronized (secretKeyEncryptionStrategyCache) {
       if (secretKeyEncryptionStrategyCache.containsKey(className)) {
@@ -143,15 +143,15 @@ public class CryptoModuleFactory {
         secretKeyEncryptionStrategyCache.put(className, strategy);
       }
     }
-    
+
     return strategy;
   }
-  
+
   @SuppressWarnings("rawtypes")
   private static SecretKeyEncryptionStrategy instantiateSecreteKeyEncryptionStrategy(String className) {
-    
+
     log.debug("About to instantiate secret key encryption strategy " + className);
-    
+
     SecretKeyEncryptionStrategy strategy = null;
     Class keyEncryptionStrategyClazz = null;
     try {
@@ -160,27 +160,27 @@ public class CryptoModuleFactory {
       log.warn(String.format("Could not find configured secret key encryption strategy \"%s\".  No encryption will be used.", className));
       return new NullSecretKeyEncryptionStrategy();
     }
-    
+
     // Check if the given class implements the CryptoModule interface
     Class[] interfaces = keyEncryptionStrategyClazz.getInterfaces();
     boolean implementsSecretKeyStrategy = false;
-    
+
     for (Class clazz : interfaces) {
       if (clazz.equals(SecretKeyEncryptionStrategy.class)) {
         implementsSecretKeyStrategy = true;
         break;
       }
     }
-    
+
     if (!implementsSecretKeyStrategy) {
       log.warn("Configured Accumulo secret key encryption strategy \"%s\" does not implement the SecretKeyEncryptionStrategy interface. No encryption will be used.");
       return new NullSecretKeyEncryptionStrategy();
     } else {
       try {
         strategy = (SecretKeyEncryptionStrategy) keyEncryptionStrategyClazz.newInstance();
-        
+
         log.debug("Successfully instantiated secret key encryption strategy " + className);
-        
+
       } catch (InstantiationException e) {
         log.warn(String.format("Got instantiation exception %s when instantiating secret key encryption strategy \"%s\".  No encryption will be used.", e
             .getCause().getClass().getName(), className));
@@ -195,85 +195,89 @@ public class CryptoModuleFactory {
     }
     return strategy;
   }
-  
+
   static class NullSecretKeyEncryptionStrategy implements SecretKeyEncryptionStrategy {
-    
+
     @Override
     public CryptoModuleParameters encryptSecretKey(CryptoModuleParameters params) {
       params.setEncryptedKey(params.getPlaintextKey());
       params.setOpaqueKeyEncryptionKeyID("");
-      
+
       return params;
     }
-    
+
     @Override
     public CryptoModuleParameters decryptSecretKey(CryptoModuleParameters params) {
       params.setPlaintextKey(params.getEncryptedKey());
       return params;
     }
-    
+
   }
-  
+
   static class NullCryptoModule implements CryptoModule {
-    
+
     @Override
     public CryptoModuleParameters getEncryptingOutputStream(CryptoModuleParameters params) throws IOException {
       params.setEncryptedOutputStream(params.getPlaintextOutputStream());
       return params;
     }
-    
+
     @Override
     public CryptoModuleParameters getDecryptingInputStream(CryptoModuleParameters params) throws IOException {
       params.setPlaintextInputStream(params.getEncryptedInputStream());
       return params;
     }
-    
+
     @Override
     public CryptoModuleParameters generateNewRandomSessionKey(CryptoModuleParameters params) {
       params.setPlaintextKey(new byte[0]);
       return params;
     }
-    
+
     @Override
     public CryptoModuleParameters initializeCipher(CryptoModuleParameters params) {
       return params;
     }
-    
+
   }
-  
+
   public static String[] parseCipherTransform(String cipherTransform) {
     if (cipherTransform == null) {
       return new String[3];
     }
-    
+
     return cipherTransform.split("/");
   }
-  
+
   public static CryptoModuleParameters createParamsObjectFromAccumuloConfiguration(AccumuloConfiguration conf) {
-    
+    CryptoModuleParameters params = new CryptoModuleParameters();
+
+    return fillParamsObjectFromConfiguration(params, conf);
+  }
+
+  public static CryptoModuleParameters fillParamsObjectFromConfiguration(CryptoModuleParameters params, AccumuloConfiguration conf) {
     // Get all the options from the configuration
     Map<String,String> cryptoOpts = conf.getAllPropertiesWithPrefix(Property.CRYPTO_PREFIX);
     cryptoOpts.putAll(conf.getAllPropertiesWithPrefix(Property.INSTANCE_PREFIX));
-    CryptoModuleParameters params = new CryptoModuleParameters();
-    
+    cryptoOpts.put(Property.CRYPTO_BLOCK_STREAM_SIZE.getKey(), Integer.toString((int) conf.getMemoryInBytes(Property.CRYPTO_BLOCK_STREAM_SIZE)));
+
     return fillParamsObjectFromStringMap(params, cryptoOpts);
   }
-  
+
   public static CryptoModuleParameters fillParamsObjectFromStringMap(CryptoModuleParameters params, Map<String,String> cryptoOpts) {
-    
+
     // Parse the cipher suite for the mode and padding options
     String[] cipherTransformParts = parseCipherTransform(cryptoOpts.get(Property.CRYPTO_CIPHER_SUITE.getKey()));
-    
+
     // If no encryption has been specified, then we abort here.
     if (cipherTransformParts[0] == null || cipherTransformParts[0].equals("NullCipher")) {
       params.setAllOptions(cryptoOpts);
       params.setAlgorithmName("NullCipher");
       return params;
     }
-    
+
     params.setAllOptions(cryptoOpts);
-    
-    //
+
     params.setAlgorithmName(cryptoOpts.get(Property.CRYPTO_CIPHER_ALGORITHM_NAME.getKey()));
     params.setEncryptionMode(cipherTransformParts[1]);
     params.setKeyEncryptionStrategyClass(cryptoOpts.get(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey()));
@@ -283,8 +287,9 @@ public class CryptoModuleFactory {
     params.setPadding(cipherTransformParts[2]);
     params.setRandomNumberGenerator(cryptoOpts.get(Property.CRYPTO_SECURE_RNG.getKey()));
     params.setRandomNumberGeneratorProvider(cryptoOpts.get(Property.CRYPTO_SECURE_RNG_PROVIDER.getKey()));
-    
+    params.setBlockStreamSize(Integer.parseInt(cryptoOpts.get(Property.CRYPTO_BLOCK_STREAM_SIZE.getKey())));
+
     return params;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleParameters.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleParameters.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleParameters.java
index d9d48fe..64e88d3 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleParameters.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleParameters.java
@@ -109,8 +109,8 @@ public class CryptoModuleParameters {
   /**
    * Sets the name of the padding type to use for an encryption stream.
    * <p>
-   * Valid names are names recognized by your cryptographic engine provider. For the default Java provider, valid names would include things like
-   * "PKCS5Padding", "None", etc.
+   * Valid names are names recognized by your cryptographic engine provider. For the default Java provider, valid names would include things like "NoPadding",
+   * "None", etc.
    * <p>
    * For <b>encryption</b>, this value is <b>required</b> and is always used. Its value should be prepended or otherwise included with the ciphertext for future
    * decryption. <br>
@@ -575,6 +575,20 @@ public class CryptoModuleParameters {
   }
   
   /**
+   * Gets the size of the buffering stream that sits above the cipher stream
+   */
+  public int getBlockStreamSize() {
+    return blockStreamSize;
+  }
+
+  /**
+   * Sets the size of the buffering stream that sits above the cipher stream
+   */
+  public void setBlockStreamSize(int blockStreamSize) {
+    this.blockStreamSize = blockStreamSize;
+  }
+
+  /**
    * Gets the overall set of options for the {@link CryptoModule}.
    * 
    * @see CryptoModuleParameters#setAllOptions(Map)
@@ -625,5 +639,5 @@ public class CryptoModuleParameters {
   private byte[] initializationVector;
   
   private Map<String,String> allOptions;
-  
+  private int blockStreamSize;
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
index d14a424..347887c 100644
--- a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
@@ -16,12 +16,11 @@
  */
 package org.apache.accumulo.core.security.crypto;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.PushbackInputStream;
 import java.security.InvalidAlgorithmParameterException;
 import java.security.InvalidKeyException;
@@ -50,7 +49,8 @@ import org.apache.log4j.Logger;
  */
 public class DefaultCryptoModule implements CryptoModule {
   
-  private static final String ENCRYPTION_HEADER_MARKER = "---Log File Encrypted (v1)---";
+  private static final String ENCRYPTION_HEADER_MARKER_V1 = "---Log File Encrypted (v1)---";
+  private static final String ENCRYPTION_HEADER_MARKER_V2 = "---Log File Encrypted (v2)---";
   private static Logger log = Logger.getLogger(DefaultCryptoModule.class);
   
   public DefaultCryptoModule() {}
@@ -249,16 +249,17 @@ public class DefaultCryptoModule implements CryptoModule {
     }
     
     CipherOutputStream cipherOutputStream = new CipherOutputStream(params.getPlaintextOutputStream(), params.getCipher());
-    BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(cipherOutputStream);    
+    BlockedOutputStream blockedOutputStream = new BlockedOutputStream(cipherOutputStream, params.getCipher().getBlockSize(), params.getBlockStreamSize());
     
-    params.setEncryptedOutputStream(bufferedOutputStream);
+    
+    params.setEncryptedOutputStream(blockedOutputStream);
     
     if (params.getRecordParametersToStream()) {
       DataOutputStream dataOut = new DataOutputStream(params.getPlaintextOutputStream());
       
       // Write a marker to indicate this is an encrypted log file (in case we read it a plain one and need to
       // not try to decrypt it. Can happen during a failure when the log's encryption settings are changing.      
-      dataOut.writeUTF(ENCRYPTION_HEADER_MARKER);
+      dataOut.writeUTF(ENCRYPTION_HEADER_MARKER_V2);
       
       
       // Write out all the parameters
@@ -281,6 +282,7 @@ public class DefaultCryptoModule implements CryptoModule {
       dataOut.writeUTF(params.getOpaqueKeyEncryptionKeyID());
       dataOut.writeInt(params.getEncryptedKey().length);
       dataOut.write(params.getEncryptedKey());
+      dataOut.writeInt(params.getBlockStreamSize());
     }
     
     return params;
@@ -295,7 +297,7 @@ public class DefaultCryptoModule implements CryptoModule {
       log.trace("About to read encryption parameters from underlying stream");
       
       String marker = dataIn.readUTF();
-      if (marker.equals(ENCRYPTION_HEADER_MARKER)) {
+      if (marker.equals(ENCRYPTION_HEADER_MARKER_V1) || marker.equals(ENCRYPTION_HEADER_MARKER_V2)) {
         
         Map<String, String> paramsFromFile = new HashMap<String, String>();
         
@@ -350,6 +352,10 @@ public class DefaultCryptoModule implements CryptoModule {
         
         params = keyEncryptionStrategy.decryptSecretKey(params);
         
+        if (marker.equals(ENCRYPTION_HEADER_MARKER_V2))
+          params.setBlockStreamSize(dataIn.readInt());
+        else
+          params.setBlockStreamSize(-1);
       } else {
         
         log.trace("Read something off of the encrypted input stream that was not the encryption header marker, so pushing back bytes and returning the given stream");
@@ -390,12 +396,14 @@ public class DefaultCryptoModule implements CryptoModule {
       throw new RuntimeException(e);
     }   
     
+    InputStream blockedDecryptingInputStream = new CipherInputStream(params.getEncryptedInputStream(), cipher);
     
-    BufferedInputStream bufferedDecryptingInputStream = new BufferedInputStream(new CipherInputStream(params.getEncryptedInputStream(), cipher));
+    if (params.getBlockStreamSize() != -1)
+      blockedDecryptingInputStream = new BlockedInputStream(blockedDecryptingInputStream, cipher.getBlockSize(), params.getBlockStreamSize());
 
     log.trace("Initialized cipher input stream with transformation ["+getCipherTransformation(params)+"]");
     
-    params.setPlaintextInputStream(bufferedDecryptingInputStream);
+    params.setPlaintextInputStream(blockedDecryptingInputStream);
 
     return params;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/core/src/main/java/org/apache/accumulo/core/security/crypto/NoFlushOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/NoFlushOutputStream.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/NoFlushOutputStream.java
new file mode 100644
index 0000000..a68bdfd
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/NoFlushOutputStream.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.security.crypto;
+
+import java.io.FilterOutputStream;
+import java.io.OutputStream;
+
+public class NoFlushOutputStream extends FilterOutputStream {
+
+  public NoFlushOutputStream(OutputStream out) {
+    super(out);
+  }
+
+  @Override
+  public void flush() {}
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/core/src/test/filtered-resources/crypto-on-accumulo-site.xml
----------------------------------------------------------------------
diff --git a/core/src/test/filtered-resources/crypto-on-accumulo-site.xml b/core/src/test/filtered-resources/crypto-on-accumulo-site.xml
deleted file mode 100644
index 70345e0..0000000
--- a/core/src/test/filtered-resources/crypto-on-accumulo-site.xml
+++ /dev/null
@@ -1,162 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<configuration>
-<!--
-  Put your site-specific accumulo configurations here.
-
-  The available configuration values along with their defaults
-  are documented in docs/config.html
-
-  Unless you are simply testing at your workstation, you will most
-  definitely need to change the three entries below.
--->
-
-    <property>
-      <name>instance.zookeeper.host</name>
-      <value>localhost:2181</value>
-      <description>comma separated list of zookeeper servers</description>
-    </property>
-
-    <property>
-      <name>logger.dir.walog</name>
-      <value>walogs</value>
-      <description>The directory used to store write-ahead logs on the local filesystem. It is possible to specify a comma-separated list of directories.</description>
-    </property>
-
-    <property>
-      <name>instance.secret</name>
-      <value>DEFAULT</value>
-      <description>A secret unique to a given instance that all servers must know in order to communicate with one another.
-                   Change it before initialization. To change it later use ./bin/accumulo org.apache.accumulo.server.util.ChangeSecret [oldpasswd] [newpasswd],
-                   and then update this file.
-      </description>
-    </property>
-
-    <property>
-      <name>tserver.memory.maps.max</name>
-      <value>80M</value>
-    </property>
-
-    <property>
-      <name>tserver.cache.data.size</name>
-      <value>7M</value>
-    </property>
-
-    <property>
-      <name>tserver.cache.index.size</name>
-      <value>20M</value>
-    </property>
-
-    <property>
-      <name>trace.password</name>
-      <!--
-        change this to the root user's password, and/or change the user below
-       -->
-      <value>password</value>
-    </property>
-
-    <property>
-      <name>trace.user</name>
-      <value>root</value>
-    </property>
-
-    <property>
-      <name>tserver.sort.buffer.size</name>
-      <value>50M</value>
-    </property>
-
-    <property>
-      <name>tserver.walog.max.size</name>
-      <value>100M</value>
-    </property>
-
-    <property>
-      <name>general.classpaths</name>
-      <value>
-    $ACCUMULO_HOME/server/target/classes/,
-    $ACCUMULO_HOME/core/target/classes/,
-    $ACCUMULO_HOME/start/target/classes/,
-    $ACCUMULO_HOME/fate/target/classes/,
-    $ACCUMULO_HOME/proxy/target/classes/,
-    $ACCUMULO_HOME/examples/target/classes/,
-    $ACCUMULO_HOME/lib/[^.].$ACCUMULO_VERSION.jar,
-    $ACCUMULO_HOME/lib/[^.].*.jar,
-    $ZOOKEEPER_HOME/zookeeper[^.].*.jar,
-    $HADOOP_CONF_DIR,
-    $HADOOP_PREFIX/[^.].*.jar,
-    $HADOOP_PREFIX/lib/[^.].*.jar,
-      </value>
-      <description>Classpaths that accumulo checks for updates and class files.
-      When using the Security Manager, please remove the ".../target/classes/" values.
-      </description>
-    </property>
-
-    <property>
-      <name>crypto.module.class</name>
-      <value>org.apache.accumulo.core.security.crypto.DefaultCryptoModule</value>
-    </property>
-    <property>
-      <name>crypto.cipher.suite</name>
-      <value>AES/CFB/PKCS5Padding</value>
-    </property>
-    <property>
-      <name>crypto.cipher.algorithm.name</name>
-      <value>AES</value>
-    </property>
-    <property>
-      <name>crypto.cipher.key.length</name>
-      <value>128</value>
-    </property>
-    <property>
-      <name>crypto.secure.rng</name>
-      <value>SHA1PRNG</value>
-    </property>
-    <property>
-      <name>crypto.secure.rng.provider</name>
-      <value>SUN</value>
-    </property>
-    <property>
-      <name>crypto.secret.key.encryption.strategy.class</name>
-      <value>org.apache.accumulo.core.security.crypto.DefaultSecretKeyEncryptionStrategy</value>
-    </property>
-    <property>
-      <name>instance.dfs.dir</name>
-      <value>${project.build.directory}</value>
-    </property>
-    <property>
-      <name>instance.dfs.uri</name>
-      <value>file://${project.build.directory}</value>
-    </property>
-
-    <property>
-      <name>crypto.default.key.strategy.hdfs.uri</name>
-      <value>file://${project.build.directory}</value>
-    </property>
-    <property>
-      <name>crypto.default.key.strategy.key.location</name>
-      <value>cryptoTestFile/test.secret.key</value>
-    </property>
-
-    <property>
-      <name>crypto.default.key.strategy.cipher.suite</name>
-      <value>AES/ECB/NoPadding</value>
-    </property>
-
-</configuration>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/core/src/test/filtered-resources/crypto-on-no-key-encryption-accumulo-site.xml
----------------------------------------------------------------------
diff --git a/core/src/test/filtered-resources/crypto-on-no-key-encryption-accumulo-site.xml b/core/src/test/filtered-resources/crypto-on-no-key-encryption-accumulo-site.xml
deleted file mode 100644
index 2fca01d..0000000
--- a/core/src/test/filtered-resources/crypto-on-no-key-encryption-accumulo-site.xml
+++ /dev/null
@@ -1,144 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<configuration>
-	<!--
-  Put your site-specific accumulo configurations here.
-
-  The available configuration values along with their defaults
-  are documented in docs/config.html
-
-  Unless you are simply testing at your workstation, you will most
-  definitely need to change the three entries below.
-	-->
-
-    <property>
-      <name>instance.zookeeper.host</name>
-      <value>localhost:2181</value>
-      <description>comma separated list of zookeeper servers</description>
-    </property>
-
-    <property>
-      <name>logger.dir.walog</name>
-      <value>walogs</value>
-      <description>The directory used to store write-ahead logs on the local filesystem. It is possible to specify a comma-separated list of directories.</description>
-    </property>
-
-    <property>
-      <name>instance.secret</name>
-      <value>DEFAULT</value>
-      <description>A secret unique to a given instance that all servers must know in order to communicate with one another.
-                   Change it before initialization. To change it later use ./bin/accumulo org.apache.accumulo.server.util.ChangeSecret [oldpasswd] [newpasswd],
-                   and then update this file.
-      </description>
-    </property>
-
-    <property>
-      <name>tserver.memory.maps.max</name>
-      <value>80M</value>
-    </property>
-
-    <property>
-      <name>tserver.cache.data.size</name>
-      <value>7M</value>
-    </property>
-
-    <property>
-      <name>tserver.cache.index.size</name>
-      <value>20M</value>
-    </property>
-
-    <property>
-      <name>trace.password</name>
-      <!--
-        change this to the root user's password, and/or change the user below
-       -->
-      <value>password</value>
-    </property>
-
-    <property>
-      <name>trace.user</name>
-      <value>root</value>
-    </property>
-
-    <property>
-      <name>tserver.sort.buffer.size</name>
-      <value>50M</value>
-    </property>
-
-    <property>
-      <name>tserver.walog.max.size</name>
-      <value>100M</value>
-    </property>
-
-    <property>
-      <name>general.classpaths</name>
-      <value>
-    $ACCUMULO_HOME/server/target/classes/,
-    $ACCUMULO_HOME/core/target/classes/,
-    $ACCUMULO_HOME/start/target/classes/,
-    $ACCUMULO_HOME/fate/target/classes/,
-    $ACCUMULO_HOME/proxy/target/classes/,
-    $ACCUMULO_HOME/examples/target/classes/,
-	$ACCUMULO_HOME/lib/[^.].$ACCUMULO_VERSION.jar,
-	$ACCUMULO_HOME/lib/[^.].*.jar,
-	$ZOOKEEPER_HOME/zookeeper[^.].*.jar,
-	$HADOOP_CONF_DIR,
-	$HADOOP_PREFIX/[^.].*.jar,
-	$HADOOP_PREFIX/lib/[^.].*.jar,
-      </value>
-      <description>Classpaths that accumulo checks for updates and class files.
-      When using the Security Manager, please remove the ".../target/classes/" values.
-      </description>
-    </property>
-
-    <property>
-      <name>crypto.module.class</name>
-      <value>org.apache.accumulo.core.security.crypto.DefaultCryptoModule</value>
-    </property>
-    <property>
-      <name>crypto.cipher.suite</name>
-      <value>AES/CFB/PKCS5Padding</value>
-    </property>
-    <property>
-      <name>crypto.cipher.algorithm.name</name>
-      <value>AES</value>
-    </property>
-    <property>
-      <name>crypto.cipher.key.length</name>
-      <value>128</value>
-    </property>
-    <property>
-      <name>crypto.secure.rng</name>
-      <value>SHA1PRNG</value>
-    </property>
-    <property>
-      <name>crypto.secure.rng.provider</name>
-      <value>SUN</value>
-    </property>
-    <property>
-      <name>instance.dfs.dir</name>
-      <value>${project.build.directory}</value>
-    </property>
-    <property>
-      <name>instance.dfs.uri</name>
-      <value>file://${project.build.directory}</value>
-    </property>
-
-</configuration>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java b/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
new file mode 100644
index 0000000..faba913
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.security.crypto;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.accumulo.core.Constants;
+import org.junit.Test;
+
+public class BlockedIOStreamTest {
+  @Test
+  public void testLargeBlockIO() throws IOException {
+    writeRead(1024, 2048);
+  }
+
+  private void writeRead(int blockSize, int expectedSize) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    BlockedOutputStream blockOut = new BlockedOutputStream(baos, blockSize, 1);
+
+    String contentString = "My Blocked Content String";
+    byte[] content = contentString.getBytes(Constants.UTF8);
+    blockOut.write(content);
+    blockOut.flush();
+
+    String contentString2 = "My Other Blocked Content String";
+    byte[] content2 = contentString2.getBytes(Constants.UTF8);
+    blockOut.write(content2);
+    blockOut.flush();
+
+    blockOut.close();
+    byte[] written = baos.toByteArray();
+    assertEquals(expectedSize, written.length);
+
+    ByteArrayInputStream biis = new ByteArrayInputStream(written);
+    BlockedInputStream blockIn = new BlockedInputStream(biis, blockSize, blockSize);
+    DataInputStream dIn = new DataInputStream(blockIn);
+
+    dIn.readFully(content, 0, content.length);
+    String readContentString = new String(content, Constants.UTF8);
+
+    assertEquals(contentString, readContentString);
+
+    dIn.readFully(content2, 0, content2.length);
+    String readContentString2 = new String(content2, Constants.UTF8);
+
+    assertEquals(contentString2, readContentString2);
+
+    blockIn.close();
+  }
+
+  @Test
+  public void testSmallBufferBlockedIO() throws IOException {
+    writeRead(16, (12 + 4) * (int) (Math.ceil(25.0/12) + Math.ceil(31.0/12)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/core/src/test/java/org/apache/accumulo/core/security/crypto/CryptoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/security/crypto/CryptoTest.java b/core/src/test/java/org/apache/accumulo/core/security/crypto/CryptoTest.java
index d37a7e0..14c39b6 100644
--- a/core/src/test/java/org/apache/accumulo/core/security/crypto/CryptoTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/security/crypto/CryptoTest.java
@@ -97,7 +97,7 @@ public class CryptoTest {
     assertNotNull(params);
     assertEquals("AES", params.getAlgorithmName());
     assertEquals("CFB", params.getEncryptionMode());
-    assertEquals("PKCS5Padding", params.getPadding());
+    assertEquals("NoPadding", params.getPadding());
     assertEquals(128, params.getKeyLength());
     assertEquals("SHA1PRNG", params.getRandomNumberGenerator());
     assertEquals("SUN", params.getRandomNumberGeneratorProvider());
@@ -206,7 +206,7 @@ public class CryptoTest {
   private byte[] setUpSampleEncryptedBytes(CryptoModule cryptoModule, CryptoModuleParameters params) throws IOException {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     
-    params.setPlaintextOutputStream(out);
+    params.setPlaintextOutputStream(new NoFlushOutputStream(out));
     
     params = cryptoModule.getEncryptingOutputStream(params);
     

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/core/src/test/resources/crypto-on-accumulo-site.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/crypto-on-accumulo-site.xml b/core/src/test/resources/crypto-on-accumulo-site.xml
index 9dc4aac..6200c48 100644
--- a/core/src/test/resources/crypto-on-accumulo-site.xml
+++ b/core/src/test/resources/crypto-on-accumulo-site.xml
@@ -114,7 +114,7 @@
     </property>
     <property>
       <name>crypto.cipher.suite</name>
-      <value>AES/CFB/PKCS5Padding</value>
+      <value>AES/CFB/NoPadding</value>
     </property>
     <property>
       <name>crypto.cipher.algorithm.name</name>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/core/src/test/resources/crypto-on-no-key-encryption-accumulo-site.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/crypto-on-no-key-encryption-accumulo-site.xml b/core/src/test/resources/crypto-on-no-key-encryption-accumulo-site.xml
index 640abac..edbcfeb 100644
--- a/core/src/test/resources/crypto-on-no-key-encryption-accumulo-site.xml
+++ b/core/src/test/resources/crypto-on-no-key-encryption-accumulo-site.xml
@@ -114,7 +114,7 @@
     </property>
     <property>
       <name>crypto.cipher.suite</name>
-      <value>AES/CFB/PKCS5Padding</value>
+      <value>AES/CFB/NoPadding</value>
     </property>
     <property>
       <name>crypto.cipher.algorithm.name</name>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/443cba7a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index eee3f68..571d1bc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -43,9 +43,11 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.crypto.CryptoModule;
 import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
 import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
 import org.apache.accumulo.core.security.crypto.DefaultCryptoModule;
+import org.apache.accumulo.core.security.crypto.NoFlushOutputStream;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.StringUtil;
 import org.apache.accumulo.server.ServerConstants;
@@ -250,8 +252,7 @@ public class DfsLogger {
     if (Arrays.equals(magicBuffer, magic)) {
       // additional parameters it needs from the underlying stream.
       String cryptoModuleClassname = input.readUTF();
-      org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
-          .getCryptoModule(cryptoModuleClassname);
+      CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoModuleClassname);
 
       // Create the parameters and set the input stream into those parameters
       CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
@@ -367,7 +368,7 @@ public class DfsLogger {
 
       CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration());
       
-      params.setPlaintextOutputStream(logFile);
+      params.setPlaintextOutputStream(new NoFlushOutputStream(logFile));
       
       // In order to bootstrap the reading of this file later, we have to record the CryptoModule that was used to encipher it here,
       // so that that crypto module can re-read its own parameters.
@@ -397,6 +398,7 @@ public class DfsLogger {
       if (logFile != null)
         logFile.close();
       logFile = null;
+      encryptingLogFile = null;
       throw new IOException(ex);
     }
     
@@ -437,9 +439,9 @@ public class DfsLogger {
         }
     }
     
-    if (logFile != null)
+    if (encryptingLogFile != null)
       try {
-        logFile.close();
+        encryptingLogFile.close();
       } catch (IOException ex) {
         log.error(ex);
         throw new LogClosedException();
@@ -470,6 +472,7 @@ public class DfsLogger {
   private synchronized void write(LogFileKey key, LogFileValue value) throws IOException {
     key.write(encryptingLogFile);
     value.write(encryptingLogFile);
+    encryptingLogFile.flush();
   }
   
   public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException {


Mime
View raw message