accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1438786 - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/conf/ core/src/main/java/org/apache/accumulo/core/security/crypto/ server/src/main/java/org/apache/accumulo/server/tabletserver/log/
Date Sat, 26 Jan 2013 00:13:36 GMT
Author: vines
Date: Sat Jan 26 00:13:36 2013
New Revision: 1438786

URL: http://svn.apache.org/viewvc?rev=1438786&view=rev
Log:
ACCUMULO-958 - Adding Michael's patch. I did some formatting checks and added an increased default replication for the dfs secret key implementation he provided. I also did some additional code suppression due to the API provided being volatiile.


Added:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java   (with props)
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java   (with props)
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java   (with props)
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModuleUtils.java   (with props)
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultSecretKeyEncryptionStrategy.java   (with props)
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategy.java   (with props)
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategyContext.java   (with props)
Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java?rev=1438786&r1=1438785&r2=1438786&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java Sat Jan 26 00:13:36 2013
@@ -16,7 +16,9 @@
  */
 package org.apache.accumulo.core.conf;
 
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -41,6 +43,25 @@ public abstract class AccumuloConfigurat
     }
   }
   
+  /**
+   * This method returns all properties in a map of string->string under the given prefix property.
+   * @param property the prefix property, and must be of type PropertyType.PREFIX
+   * @return a map of strings to strings of the resulting properties
+   */
+  public Map<String, String> getAllPropertiesWithPrefix(Property property) {
+    checkType(property, PropertyType.PREFIX);
+    
+    Map<String, String> propMap = new HashMap<String, String>(); 
+    
+    for (Entry<String, String> entry : this) {
+      if (entry.getKey().startsWith(property.getKey())) {
+        propMap.put(entry.getKey(), entry.getValue());
+      }
+    }
+    
+    return propMap;
+  }
+  
   public long getMemoryInBytes(Property property) {
     checkType(property, PropertyType.MEMORY);
     

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java?rev=1438786&r1=1438785&r2=1438786&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/ConfigSanityCheck.java Sat Jan 26 00:13:36 2013
@@ -30,8 +30,8 @@ public class ConfigSanityCheck {
       String key = entry.getKey();
       String value = entry.getValue();
       Property prop = Property.getPropertyByKey(entry.getKey());
-      if (prop == null && Property.isValidTablePropertyKey(key))
-        continue; // unknown valid per-table property
+      if (prop == null && Property.isValidPropertyKey(key))
+        continue; // unknown valid property (i.e. has proper prefix)
       else if (prop == null)
         log.warn(PREFIX + "unrecognized property key (" + key + ")");
       else if (prop.getType() == PropertyType.PREFIX)

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1438786&r1=1438785&r2=1438786&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java Sat Jan 26 00:13:36 2013
@@ -27,6 +27,30 @@ import org.apache.accumulo.start.classlo
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 
 public enum Property {
+  
+  // Crypto-related properties
+  CRYPTO_PREFIX("crypto.", null, PropertyType.PREFIX, "Properties in this category related to the configuration of both default and custom crypto modules."),
+  CRYPTO_MODULE_CLASS(
+      "crypto.module.class",
+      "NullCryptoModule",
+      PropertyType.STRING,
+      "Fully qualified class name of the class that implements the CryptoModule interface, to be used in setting up encryption at rest for the WAL and (future) other parts of the code."),
+  CRYPTO_CIPHER_SUITE("crypto.cipher.suite", "NullCipher", PropertyType.STRING, "Describes the cipher suite to use for the write-ahead log"),
+  CRYPTO_CIPHER_ALGORITHM_NAME("crypto.cipher.algorithm.name", "NullCipher", PropertyType.STRING,
+      "States the name of the algorithm used in the corresponding cipher suite.  Do not make these different, unless you enjoy mysterious exceptions and bugs."),
+  CRYPTO_CIPHER_KEY_LENGTH("crypto.cipher.key.length", "128", PropertyType.STRING,
+      "Specifies the key length *in bits* to use for the symmetric key, should probably be 128 or 256 unless you really know what you're doing"),
+  CRYPTO_SECURE_RNG("crypto.secure.rng", "SHA1PRNG", PropertyType.STRING,
+      "States the secure random number generator to use, and defaults to the built-in Sun SHA1PRNG"),
+  CRYPTO_SECURE_RNG_PROVIDER("crypto.secure.rng.provider", "SUN", PropertyType.STRING,
+      "States the secure random number generator provider to use, and defaults to the built-in SUN provider"),
+  CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS("crypto.secret.key.encryption.strategy.class", "NullSecretKeyEncryptionStrategy", PropertyType.STRING,
+      "The class Accumulo should use for its key encryption strategy."),
+  CRYPTO_DEFAULT_KEY_STRATEGY_HDFS_URI("crypto.default.key.strategy.hdfs.uri", "", PropertyType.STRING,
+      "The URL Accumulo should use to connect to DFS. If this is blank, Accumulo will obtain this information from the Hadoop configuration"),
+  CRYPTO_DEFAULT_KEY_STRATEGY_KEY_LOCATION("crypto.default.key.strategy.key.location", "/accumulo/crypto/secret/keyEncryptionKey", PropertyType.ABSOLUTEPATH,
+      "The absolute path of where to store the key encryption key within HDFS."),
+  
   // instance properties (must be the same for every node in an instance)
   INSTANCE_PREFIX("instance.", null, PropertyType.PREFIX,
       "Properties in this category must be consistent throughout a cloud. This is enforced and servers won't be able to communicate if these differ."),
@@ -56,8 +80,7 @@ public enum Property {
           + "starting in the first location to the last. Please note, hadoop conf and hadoop lib directories NEED to be here, "
           + "along with accumulo lib and zookeeper directory. Supports full regex on filename alone."), // needs special treatment in accumulo start jar
   GENERAL_DYNAMIC_CLASSPATHS(AccumuloVFSClassLoader.DYNAMIC_CLASSPATH_PROPERTY_NAME, AccumuloVFSClassLoader.DEFAULT_DYNAMIC_CLASSPATH_VALUE,
-      PropertyType.STRING,
-      "A list of all of the places where changes in jars or classes will force a reload of the classloader."),
+      PropertyType.STRING, "A list of all of the places where changes in jars or classes will force a reload of the classloader."),
   GENERAL_RPC_TIMEOUT("general.rpc.timeout", "120s", PropertyType.TIMEDURATION, "Time to wait on I/O for simple, short RPC calls"),
   GENERAL_KERBEROS_KEYTAB("general.kerberos.keytab", "", PropertyType.PATH, "Path to the kerberos keytab to use. Leave blank if not using kerberoized hdfs"),
   GENERAL_KERBEROS_PRINCIPAL("general.kerberos.principal", "", PropertyType.STRING, "Name of the kerberos principal to use. _HOST will automatically be "
@@ -78,8 +101,10 @@ public enum Property {
   MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
   MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,
       "When a tablet server's lock is deleted, it takes time for it to completely quit. This delay gives it time before log recoveries begin."),
-  MASTER_LEASE_RECOVERY_IMPLEMETATION("master.lease.recovery.implementation", "org.apache.accumulo.server.master.recovery.RecoverLease", PropertyType.CLASSNAME, "A class that implements a mechansim to steal write access to a file"),
-  MASTER_FATE_THREADPOOL_SIZE("master.fate.threadpool.size", "4", PropertyType.COUNT, "The number of threads used to run FAult-Tolerant Executions.  These are primarily table operations like merge."),
+  MASTER_LEASE_RECOVERY_IMPLEMETATION("master.lease.recovery.implementation", "org.apache.accumulo.server.master.recovery.RecoverLease",
+      PropertyType.CLASSNAME, "A class that implements a mechansim to steal write access to a file"),
+  MASTER_FATE_THREADPOOL_SIZE("master.fate.threadpool.size", "4", PropertyType.COUNT,
+      "The number of threads used to run FAult-Tolerant Executions.  These are primarily table operations like merge."),
   
   // properties that are specific to tablet server behavior
   TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers"),
@@ -129,10 +154,7 @@ public enum Property {
   TSERV_BLOOM_LOAD_MAXCONCURRENT("tserver.bloom.load.concurrent.max", "4", PropertyType.COUNT,
       "The number of concurrent threads that will load bloom filters in the background. "
           + "Setting this to zero will make bloom filters load in the foreground."),
-  TSERV_MONITOR_FS(
-      "tserver.monitor.fs",
-      "true",
-      PropertyType.BOOLEAN,
+  TSERV_MONITOR_FS("tserver.monitor.fs", "true", PropertyType.BOOLEAN,
       "When enabled the tserver will monitor file systems and kill itself when one switches from rw to ro.  This is usually and indication that Linux has"
           + " detected a bad disk."),
   TSERV_MEMDUMP_DIR("tserver.dir.memdump", "/tmp", PropertyType.PATH,
@@ -168,7 +190,7 @@ public enum Property {
   TSERV_ARCHIVE_WALOGS("tserver.archive.walogs", "false", PropertyType.BOOLEAN, "Keep copies of the WALOGs for debugging purposes"),
   TSERV_WORKQ_THREADS("tserver.workq.threads", "2", PropertyType.COUNT,
       "The number of threads for the distributed workq.  These threads are used for copying failed bulk files."),
-
+  
   // properties that are specific to logger server behavior
   LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the write-ahead logger servers"),
   LOGGER_DIR("logger.dir.walog", "walogs", PropertyType.PATH,
@@ -257,25 +279,29 @@ public enum Property {
       PropertyType.CLASSNAME,
       "A function that can transform the key prior to insertion and check of bloom filter.  org.apache.accumulo.core.file.keyfunctor.RowFunctor,"
           + ",org.apache.accumulo.core.file.keyfunctor.ColumnFamilyFunctor, and org.apache.accumulo.core.file.keyfunctor.ColumnQualifierFunctor are allowable values."
-          + " One can extend any of the above mentioned classes to perform specialized parsing of the key. "), TABLE_BLOOM_HASHTYPE("table.bloom.hash.type",
-      "murmur", PropertyType.STRING, "The bloom filter hash type"), TABLE_FAILURES_IGNORE("table.failures.ignore", "false", PropertyType.BOOLEAN,
+          + " One can extend any of the above mentioned classes to perform specialized parsing of the key. "),
+  TABLE_BLOOM_HASHTYPE("table.bloom.hash.type", "murmur", PropertyType.STRING, "The bloom filter hash type"),
+  TABLE_FAILURES_IGNORE("table.failures.ignore", "false", PropertyType.BOOLEAN,
       "If you want queries for your table to hang or fail when data is missing from the system, "
           + "then set this to false. When this set to true missing data will be reported but queries "
-          + "will still run possibly returning a subset of the data."), TABLE_DEFAULT_SCANTIME_VISIBILITY("table.security.scan.visibility.default", "",
-      PropertyType.STRING, "The security label that will be assumed at scan time if an entry does not have a visibility set.<br />"
+          + "will still run possibly returning a subset of the data."),
+  TABLE_DEFAULT_SCANTIME_VISIBILITY("table.security.scan.visibility.default", "", PropertyType.STRING,
+      "The security label that will be assumed at scan time if an entry does not have a visibility set.<br />"
           + "Note: An empty security label is displayed as []. The scan results will show an empty visibility even if "
           + "the visibility from this setting is applied to the entry.<br />"
           + "CAUTION: If a particular key has an empty security label AND its table's default visibility is also empty, "
           + "access will ALWAYS be granted for users with permission to that table. Additionally, if this field is changed, "
-          + "all existing data with an empty visibility label will be interpreted with the new label on the next scan."), TABLE_LOCALITY_GROUPS(
-      "table.groups.enabled", "", PropertyType.STRING, "A comma separated list of locality group names to enable for this table."), TABLE_CONSTRAINT_PREFIX(
-      "table.constraint.", null, PropertyType.PREFIX, "Properties in this category are per-table properties that add constraints to a table. "
+          + "all existing data with an empty visibility label will be interpreted with the new label on the next scan."),
+  TABLE_LOCALITY_GROUPS("table.groups.enabled", "", PropertyType.STRING, "A comma separated list of locality group names to enable for this table."),
+  TABLE_CONSTRAINT_PREFIX("table.constraint.", null, PropertyType.PREFIX,
+      "Properties in this category are per-table properties that add constraints to a table. "
           + "These properties start with the category prefix, followed by a number, and their values "
           + "correspond to a fully qualified Java class that implements the Constraint interface.<br />"
           + "For example, table.constraint.1 = org.apache.accumulo.core.constraints.MyCustomConstraint "
-          + "and table.constraint.2 = my.package.constraints.MySecondConstraint"), TABLE_INDEXCACHE_ENABLED("table.cache.index.enable", "true",
-      PropertyType.BOOLEAN, "Determines whether index cache is enabled."), TABLE_BLOCKCACHE_ENABLED("table.cache.block.enable", "false", PropertyType.BOOLEAN,
-      "Determines whether file block cache is enabled."), TABLE_ITERATOR_PREFIX("table.iterator.", null, PropertyType.PREFIX,
+          + "and table.constraint.2 = my.package.constraints.MySecondConstraint"),
+  TABLE_INDEXCACHE_ENABLED("table.cache.index.enable", "true", PropertyType.BOOLEAN, "Determines whether index cache is enabled."),
+  TABLE_BLOCKCACHE_ENABLED("table.cache.block.enable", "false", PropertyType.BOOLEAN, "Determines whether file block cache is enabled."),
+  TABLE_ITERATOR_PREFIX("table.iterator.", null, PropertyType.PREFIX,
       "Properties in this category specify iterators that are applied at various stages (scopes) of interaction "
           + "with a table. These properties start with the category prefix, followed by a scope (minc, majc, scan, etc.), "
           + "followed by a period, followed by a name, as in table.iterator.scan.vers, or table.iterator.scan.custom. "
@@ -283,31 +309,31 @@ public enum Property {
           + "such as table.iterator.scan.vers = 10,org.apache.accumulo.core.iterators.VersioningIterator<br /> "
           + "These iterators can take options if additional properties are set that look like this property, "
           + "but are suffixed with a period, followed by 'opt' followed by another period, and a property name.<br />"
-          + "For example, table.iterator.minc.vers.opt.maxVersions = 3"), TABLE_LOCALITY_GROUP_PREFIX("table.group.", null, PropertyType.PREFIX,
+          + "For example, table.iterator.minc.vers.opt.maxVersions = 3"),
+  TABLE_LOCALITY_GROUP_PREFIX("table.group.", null, PropertyType.PREFIX,
       "Properties in this category are per-table properties that define locality groups in a table. These properties start "
           + "with the category prefix, followed by a name, followed by a period, and followed by a property for that group.<br />"
           + "For example table.group.group1=x,y,z sets the column families for a group called group1. Once configured, "
           + "group1 can be enabled by adding it to the list of groups in the " + TABLE_LOCALITY_GROUPS.getKey() + " property.<br />"
           + "Additional group options may be specified for a named group by setting table.group.&lt;name&gt;.opt.&lt;key&gt;=&lt;value&gt;."),
-  TABLE_FORMATTER_CLASS("table.formatter", DefaultFormatter.class.getName(), PropertyType.STRING,
-      "The Formatter class to apply on results in the shell"),
+  TABLE_FORMATTER_CLASS("table.formatter", DefaultFormatter.class.getName(), PropertyType.STRING, "The Formatter class to apply on results in the shell"),
   TABLE_INTERPRETER_CLASS("table.interepreter", DefaultScanInterpreter.class.getName(), PropertyType.STRING,
       "The ScanInterpreter class to apply on scan arguments in the shell"),
   TABLE_CLASSPATH("table.classpath.context", "", PropertyType.STRING, "Per table classpath"),
-      
-      
-  //VFS ClassLoader properties
+  
+  // VFS ClassLoader properties
   VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY(AccumuloVFSClassLoader.VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY, "", PropertyType.STRING,
       "Configuration for a system level vfs classloader.  Accumulo jar can be configured here and loaded out of HDFS."),
-  VFS_CONTEXT_CLASSPATH_PROPERTY(AccumuloVFSClassLoader.VFS_CONTEXT_CLASSPATH_PROPERTY, null, PropertyType.PREFIX,
+  VFS_CONTEXT_CLASSPATH_PROPERTY(
+      AccumuloVFSClassLoader.VFS_CONTEXT_CLASSPATH_PROPERTY,
+      null,
+      PropertyType.PREFIX,
       "Properties in this category are define a classpath. These properties start  with the category prefix, followed by a context name.  "
           + "The value is a comma seperated list of URIs. Supports full regex on filename alone. For example general.vfs.context.classpath.cx1=hdfs://nn1:9902/mylibdir/*.jar.  "
           + "You can enable post delegation for a context, which will load classes from the context first instead of the parent first.  "
           + "Do this by setting general.vfs.context.classpath.<name>.delegation=post, where <name> is your context name.  "
           + "If delegation is not specified, it defaults to loading from parent classloader first.");
-      
   
-
   private String key, defaultValue, description;
   private PropertyType type;
   
@@ -339,15 +365,44 @@ public enum Property {
   }
   
   private static HashSet<String> validTableProperties = null;
+  private static HashSet<String> validProperties = null;
+  private static HashSet<String> validPrefixes = null;
+  
+  private static boolean isKeyValidlyPrefixed(String key) {
+    for (String prefix : validPrefixes) {
+      if (key.startsWith(prefix))
+        return true;
+    }
+    
+    return false;
+  }
+  
+  public synchronized static boolean isValidPropertyKey(String key) {
+    if (validProperties == null) {
+      validProperties = new HashSet<String>();
+      validPrefixes = new HashSet<String>();
+      
+      for (Property p : Property.values()) {
+        if (p.getType().equals(PropertyType.PREFIX)) {
+          validPrefixes.add(p.getKey());
+        } else {
+          validProperties.add(p.getKey());
+        }
+      }
+    }
+    
+    return validProperties.contains(key) || isKeyValidlyPrefixed(key);
+  }
   
+  // Is this method still needed?
   public synchronized static boolean isValidTablePropertyKey(String key) {
-      if (validTableProperties == null) {
-  	    validTableProperties = new HashSet<String>();
-        for (Property p : Property.values()) {
-          if (!p.getType().equals(PropertyType.PREFIX) && p.getKey().startsWith(Property.TABLE_PREFIX.getKey())) {
-        	  validTableProperties.add(p.getKey());
-          }
+    if (validTableProperties == null) {
+      validTableProperties = new HashSet<String>();
+      for (Property p : Property.values()) {
+        if (!p.getType().equals(PropertyType.PREFIX) && p.getKey().startsWith(Property.TABLE_PREFIX.getKey())) {
+          validTableProperties.add(p.getKey());
         }
+      }
     }
     
     return validTableProperties.contains(key) || key.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey())

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java?rev=1438786&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java (added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java Sat Jan 26 00:13:36 2013
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.security.crypto;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * Classes that obey this interface may be used to provide encrypting and decrypting streams to the rest of Accumulo. Classes that obey this interface may be
+ * configured as the crypto module by setting the property crypto.module.class in the accumulo-site.xml file.
+ * 
+ * Note that this first iteration of this API is considered deprecated because we anticipate it changing in non-backwards compatible ways as we explore the
+ * requirements for encryption in Accumulo. So, your mileage is gonna vary a lot as we go forward.
+ * 
+ */
+@Deprecated
+public interface CryptoModule {
+  
+  public enum CryptoInitProperty {
+    ALGORITHM_NAME("algorithm.name"), 
+    CIPHER_SUITE("cipher.suite"), 
+    INITIALIZATION_VECTOR("initialization.vector"), 
+    PLAINTEXT_SESSION_KEY("plaintext.session.key");
+    
+    private CryptoInitProperty(String name) {
+      key = name;
+    }
+    
+    private String key;
+    
+    public String getKey() {
+      return key;
+    }
+  }
+  
+  /**
+   * Wraps an OutputStream in an encrypting OutputStream. The given map contains the settings for the cryptographic algorithm to use. <b>Callers of this method
+   * should expect that the given OutputStream will be written to before cryptographic writes occur.</b> These writes contain the cryptographic information used
+   * to encrypt the following bytes (these data include the initialization vector, encrypted session key, and so on). If writing arbitrarily to the underlying
+   * stream is not desirable, users should call the other flavor of getEncryptingOutputStream which accepts these data as parameters.
+   * 
+   * @param out
+   *          the OutputStream to wrap
+   * @param conf
+   *          the cryptographic parameters to use; specific string names to look for will depend on the various implementations
+   * @return an OutputStream that wraps the given parameter
+   * @throws IOException
+   */
+  public OutputStream getEncryptingOutputStream(OutputStream out, Map<String,String> cryptoOpts) throws IOException;
+  
+  /**
+   * Wraps an InputStream and returns a decrypting input stream. The given map contains the settings for the intended cryptographic operations, but implementors
+   * should take care to ensure that the crypto from the given input stream matches their expectations about what they will use to decrypt it, as the parameters
+   * may have changed. Also, care should be taken around transitioning between non-encrypting and encrypting streams; implementors should handle the case where
+   * the given input stream is <b>not</b> encrypted at all.
+   * 
+   * It is expected that this version of getDecryptingInputStream is called in conjunction with the getEncryptingOutputStream from above. It should expect its
+   * input streams to contain the data written by getEncryptingOutputStream.
+   * 
+   * @param in
+   *          the InputStream to wrap
+   * @param conf
+   *          the cryptographic parameters to use; specific string names to look for will depend on the various implementations
+   * @return an InputStream that wraps the given parameter
+   * @throws IOException
+   */
+  public InputStream getDecryptingInputStream(InputStream in, Map<String,String> cryptoOpts) throws IOException;
+  
+  /**
+   * Wraps an OutputStream in an encrypting OutputStream. The given map contains the settings for the cryptographic algorithm to use. The cryptoInitParams map
+   * contains all the cryptographic details to construct a key (or keys), initialization vectors, etc. and use them to properly initialize the stream for
+   * writing. These initialization parameters must be persisted elsewhere, along with the cryptographic configuration (algorithm, mode, etc.), so that they may
+   * be read in at the time of reading the encrypted content.
+   * 
+   * @param out
+   *          the OutputStream to wrap
+   * @param conf
+   *          the cryptographic algorithm configuration
+   * @param cryptoInitParams
+   *          the initialization parameters for the algorithm, usually including initialization vector and session key
+   * @return a wrapped output stream
+   */
+  public OutputStream getEncryptingOutputStream(OutputStream out, Map<String,String> conf, Map<CryptoModule.CryptoInitProperty,Object> cryptoInitParams);
+  
+  /**
+   * Wraps an InputStream and returns a decrypting input stream. The given map contains the settings for the intended cryptographic operations, but implementors
+   * should take care to ensure that the crypto from the given input stream matches their expectations about what they will use to decrypt it, as the parameters
+   * may have changed. Also, care should be taken around transitioning between non-encrypting and encrypting streams; implementors should handle the case where
+   * the given input stream is <b>not</b> encrypted at all.
+   * 
+   * The cryptoInitParams contains all necessary information to properly initialize the given cipher, usually including things like initialization vector and
+   * secret key.
+   * 
+   * @param in
+   * @param cryptoOpts
+   * @param cryptoInitParams
+   * @return
+   * @throws IOException
+   */
+  public InputStream getDecryptingInputStream(InputStream in, Map<String,String> cryptoOpts, Map<CryptoModule.CryptoInitProperty,Object> cryptoInitParams)
+      throws IOException;
+}

Propchange: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModule.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java?rev=1438786&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java (added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java Sat Jan 26 00:13:36 2013
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.security.crypto;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.log4j.Logger;
+
+/**
+ * This factory module exists to assist other classes in loading crypto modules.
+ */
+public class CryptoModuleFactory {
+  
+  private static Logger log = Logger.getLogger(CryptoModuleFactory.class);
+  
+  /**
+   * This method returns a crypto module based on settings in the given configuration parameter.
+   * 
+   * @param conf
+   * @return a class implementing the CryptoModule interface. It will *never* return null; rather, it will return a class which obeys the interface but makes no
+   *         changes to the underlying data.
+   */
+  
+  @SuppressWarnings("deprecation")
+  public static CryptoModule getCryptoModule(AccumuloConfiguration conf) {
+    String cryptoModuleClassname = conf.get(Property.CRYPTO_MODULE_CLASS);
+    return getCryptoModule(cryptoModuleClassname);
+  }
+  
+  @SuppressWarnings({"rawtypes", "deprecation"})
+  public static CryptoModule getCryptoModule(String cryptoModuleClassname) {
+    log.debug(String.format("About to instantiate crypto module %s", cryptoModuleClassname));
+    
+    if (cryptoModuleClassname.equals("NullCryptoModule")) {
+      return new NullCryptoModule();
+    }
+    
+    CryptoModule cryptoModule = null;
+    Class cryptoModuleClazz = null;
+    try {
+      cryptoModuleClazz = AccumuloVFSClassLoader.loadClass(cryptoModuleClassname);
+    } catch (ClassNotFoundException e1) {
+      log.warn(String.format("Could not find configured crypto module \"%s\".  NO ENCRYPTION WILL BE USED.", cryptoModuleClassname));
+      return new NullCryptoModule();
+    }
+    
+    // Check if the given class implements the CryptoModule interface
+    Class[] interfaces = cryptoModuleClazz.getInterfaces();
+    boolean implementsCryptoModule = false;
+    
+    for (Class clazz : interfaces) {
+      if (clazz.equals(CryptoModule.class)) {
+        implementsCryptoModule = true;
+        break;
+      }
+    }
+    
+    if (!implementsCryptoModule) {
+      log.warn("Configured Accumulo crypto module \"%s\" does not implement the CryptoModule interface. NO ENCRYPTION WILL BE USED.");
+      return new NullCryptoModule();
+    } else {
+      try {
+        cryptoModule = (CryptoModule) cryptoModuleClazz.newInstance();
+        
+        log.debug("Successfully instantiated crypto module");
+        
+      } catch (InstantiationException e) {
+        log.warn(String.format("Got instantiation exception %s when instantiating crypto module \"%s\".  NO ENCRYPTION WILL BE USED.", e.getCause().getClass()
+            .getCanonicalName(), cryptoModuleClassname));
+        log.warn(e.getCause());
+        return new NullCryptoModule();
+      } catch (IllegalAccessException e) {
+        log.warn(String.format("Got illegal access exception when trying to instantiate crypto module \"%s\".  NO ENCRYPTION WILL BE USED.",
+            cryptoModuleClassname));
+        log.warn(e);
+        return new NullCryptoModule();
+      }
+    }
+    return cryptoModule;
+  }
+  
+  public static SecretKeyEncryptionStrategy getSecretKeyEncryptionStrategy(AccumuloConfiguration conf) {
+    String className = conf.get(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS);
+    return getSecretKeyEncryptionStrategy(className);
+  }
+  
+  @SuppressWarnings("rawtypes")
+  public static SecretKeyEncryptionStrategy getSecretKeyEncryptionStrategy(String className) {
+    if (className == null || className.equals("NullSecretKeyEncryptionStrategy")) {
+      return new NullSecretKeyEncryptionStrategy();
+    }
+    
+    SecretKeyEncryptionStrategy strategy = null;
+    Class keyEncryptionStrategyClazz = null;
+    try {
+      keyEncryptionStrategyClazz = AccumuloVFSClassLoader.loadClass(className);
+    } catch (ClassNotFoundException e1) {
+      log.warn(String.format("Could not find configured secret key encryption strategy \"%s\".  NO ENCRYPTION WILL BE USED.", className));
+      return new NullSecretKeyEncryptionStrategy();
+    }
+    
+    // Check if the given class implements the CryptoModule interface
+    Class[] interfaces = keyEncryptionStrategyClazz.getInterfaces();
+    boolean implementsSecretKeyStrategy = false;
+    
+    for (Class clazz : interfaces) {
+      if (clazz.equals(SecretKeyEncryptionStrategy.class)) {
+        implementsSecretKeyStrategy = true;
+        break;
+      }
+    }
+    
+    if (!implementsSecretKeyStrategy) {
+      log.warn("Configured Accumulo secret key encryption strategy \"%s\" does not implement the SecretKeyEncryptionStrategy interface. NO ENCRYPTION WILL BE USED.");
+      return new NullSecretKeyEncryptionStrategy();
+    } else {
+      try {
+        strategy = (SecretKeyEncryptionStrategy) keyEncryptionStrategyClazz.newInstance();
+        
+        log.debug("Successfully instantiated secret key encryption strategy");
+        
+      } catch (InstantiationException e) {
+        log.warn(String.format("Got instantiation exception %s when instantiating secret key encryption strategy \"%s\".  NO ENCRYPTION WILL BE USED.", e
+            .getCause().getClass().getCanonicalName(), className));
+        log.warn(e.getCause());
+        return new NullSecretKeyEncryptionStrategy();
+      } catch (IllegalAccessException e) {
+        log.warn(String.format("Got illegal access exception when trying to instantiate secret key encryption strategy \"%s\".  NO ENCRYPTION WILL BE USED.",
+            className));
+        log.warn(e);
+        return new NullSecretKeyEncryptionStrategy();
+      }
+    }
+    
+    return strategy;
+  }
+  
+  private static class NullSecretKeyEncryptionStrategy implements SecretKeyEncryptionStrategy {
+    
+    @Override
+    public SecretKeyEncryptionStrategyContext encryptSecretKey(SecretKeyEncryptionStrategyContext context) {
+      context.setEncryptedSecretKey(context.getPlaintextSecretKey());
+      context.setOpaqueKeyEncryptionKeyID("");
+      
+      return context;
+    }
+    
+    @Override
+    public SecretKeyEncryptionStrategyContext decryptSecretKey(SecretKeyEncryptionStrategyContext context) {
+      context.setPlaintextSecretKey(context.getEncryptedSecretKey());
+      
+      return context;
+    }
+    
+    public SecretKeyEncryptionStrategyContext getNewContext() {
+      return new SecretKeyEncryptionStrategyContext() {
+        
+        public byte[] getPlaintextSecretKey() {
+          return plaintextSecretKey;
+        }
+        
+        public void setPlaintextSecretKey(byte[] plaintextSecretKey) {
+          this.plaintextSecretKey = plaintextSecretKey;
+        }
+        
+        public byte[] getEncryptedSecretKey() {
+          return encryptedSecretKey;
+        }
+        
+        public void setEncryptedSecretKey(byte[] encryptedSecretKey) {
+          this.encryptedSecretKey = encryptedSecretKey;
+        }
+        
+        public String getOpaqueKeyEncryptionKeyID() {
+          return opaqueKeyEncryptionKeyID;
+        }
+        
+        public void setOpaqueKeyEncryptionKeyID(String opaqueKeyEncryptionKeyID) {
+          this.opaqueKeyEncryptionKeyID = opaqueKeyEncryptionKeyID;
+        }
+        
+        public Map<String,String> getContext() {
+          return context;
+        }
+        
+        public void setContext(Map<String,String> context) {
+          this.context = context;
+        }
+        
+        private byte[] plaintextSecretKey;
+        private byte[] encryptedSecretKey;
+        private String opaqueKeyEncryptionKeyID;
+        private Map<String,String> context;
+      };
+    }
+    
+  }
+  
+  @SuppressWarnings("deprecation")
+  private static class NullCryptoModule implements CryptoModule {
+    
+    @Override
+    public OutputStream getEncryptingOutputStream(OutputStream out, Map<String,String> cryptoOpts) throws IOException {
+      return out;
+    }
+    
+    @Override
+    public InputStream getDecryptingInputStream(InputStream in, Map<String,String> cryptoOpts) throws IOException {
+      return in;
+    }
+    
+    @Override
+    public OutputStream getEncryptingOutputStream(OutputStream out, Map<String,String> conf, Map<CryptoInitProperty,Object> cryptoInitParams) {
+      return out;
+    }
+    
+    @Override
+    public InputStream getDecryptingInputStream(InputStream in, Map<String,String> cryptoOpts, Map<CryptoInitProperty,Object> cryptoInitParams)
+        throws IOException {
+      return in;
+    }
+    
+  }
+  
+}

Propchange: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java?rev=1438786&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java (added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java Sat Jan 26 00:13:36 2013
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.security.crypto;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PushbackInputStream;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.log4j.Logger;
+
+/**
+ * This class contains the gritty details around setting up encrypted streams for reading and writing the log file. It obeys the interface CryptoModule, which
+ * other developers can implement to change out this logic as necessary.
+ * 
+ */
+
+@SuppressWarnings("deprecation")
+public class DefaultCryptoModule implements CryptoModule {
+  
+  // This is how *I* like to format my variable declarations. Your mileage may vary.
+  
+  private static final String ENCRYPTION_HEADER_MARKER = "---Log File Encrypted (v1)---";
+  private static Logger log = Logger.getLogger(DefaultCryptoModule.class);
+  
+  public DefaultCryptoModule() {}
+  
+  @Override
+  public OutputStream getEncryptingOutputStream(OutputStream out, Map<String,String> cryptoOpts) throws IOException {
+    
+    log.debug("Initializing crypto output stream");
+    
+    String cipherSuite = cryptoOpts.get(Property.CRYPTO_CIPHER_SUITE.getKey());
+    
+    if (cipherSuite.equals("NullCipher")) {
+      return out;
+    }
+    
+    String algorithmName = cryptoOpts.get(Property.CRYPTO_CIPHER_ALGORITHM_NAME.getKey());
+    String secureRNG = cryptoOpts.get(Property.CRYPTO_SECURE_RNG.getKey());
+    String secureRNGProvider = cryptoOpts.get(Property.CRYPTO_SECURE_RNG_PROVIDER.getKey());
+    SecureRandom secureRandom = DefaultCryptoModuleUtils.getSecureRandom(secureRNG, secureRNGProvider);
+    int keyLength = Integer.parseInt(cryptoOpts.get(Property.CRYPTO_CIPHER_KEY_LENGTH.getKey()));
+    
+    byte[] randomKey = new byte[keyLength / 8];
+    
+    Map<CryptoInitProperty,Object> cryptoInitParams = new HashMap<CryptoInitProperty,Object>();
+    
+    secureRandom.nextBytes(randomKey);
+    cryptoInitParams.put(CryptoInitProperty.PLAINTEXT_SESSION_KEY, randomKey);
+    
+    SecretKeyEncryptionStrategy keyEncryptionStrategy = CryptoModuleFactory.getSecretKeyEncryptionStrategy(cryptoOpts
+        .get(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey()));
+    SecretKeyEncryptionStrategyContext keyEncryptionStrategyContext = keyEncryptionStrategy.getNewContext();
+    
+    keyEncryptionStrategyContext.setPlaintextSecretKey(randomKey);
+    keyEncryptionStrategyContext.setContext(cryptoOpts);
+    
+    keyEncryptionStrategyContext = keyEncryptionStrategy.encryptSecretKey(keyEncryptionStrategyContext);
+    
+    byte[] encryptedRandomKey = keyEncryptionStrategyContext.getEncryptedSecretKey();
+    String opaqueId = keyEncryptionStrategyContext.getOpaqueKeyEncryptionKeyID();
+    
+    OutputStream cipherOutputStream = getEncryptingOutputStream(out, cryptoOpts, cryptoInitParams);
+    
+    // Get the IV from the init params, since we didn't create it but the other getEncryptingOutputStream did
+    byte[] initVector = (byte[]) cryptoInitParams.get(CryptoInitProperty.INITIALIZATION_VECTOR);
+    
+    DataOutputStream dataOut = new DataOutputStream(out);
+    
+    // Write a marker to indicate this is an encrypted log file (in case we read it a plain one and need to
+    // not try to decrypt it. Can happen during a failure when the log's encryption settings are changing.
+    dataOut.writeUTF(ENCRYPTION_HEADER_MARKER);
+    
+    // Write out the cipher suite and algorithm used to encrypt this file. In case the admin changes, we want to still
+    // decode the old format.
+    dataOut.writeUTF(cipherSuite);
+    dataOut.writeUTF(algorithmName);
+    
+    // Write the init vector to the log file
+    dataOut.writeInt(initVector.length);
+    dataOut.write(initVector);
+    
+    // Write out the encrypted session key and the opaque ID
+    dataOut.writeUTF(opaqueId);
+    dataOut.writeInt(encryptedRandomKey.length);
+    dataOut.write(encryptedRandomKey);
+    
+    // Write the secret key (encrypted) into the log file
+    // dataOut.writeInt(randomKey.length);
+    // dataOut.write(randomKey);
+    
+    return cipherOutputStream;
+  }
+  
+  @Override
+  public InputStream getDecryptingInputStream(InputStream in, Map<String,String> cryptoOpts) throws IOException {
+    DataInputStream dataIn = new DataInputStream(in);
+    
+    String marker = dataIn.readUTF();
+    
+    log.debug("Read encryption header");
+    if (marker.equals(ENCRYPTION_HEADER_MARKER)) {
+      
+      String cipherSuiteFromFile = dataIn.readUTF();
+      String algorithmNameFromFile = dataIn.readUTF();
+      
+      // Read the secret key and initialization vector from the file
+      int initVectorLength = dataIn.readInt();
+      byte[] initVector = new byte[initVectorLength];
+      dataIn.read(initVector, 0, initVectorLength);
+      
+      // Read the opaque ID and encrypted session key
+      String opaqueId = dataIn.readUTF();
+      int encryptedSecretKeyLength = dataIn.readInt();
+      byte[] encryptedSecretKey = new byte[encryptedSecretKeyLength];
+      dataIn.read(encryptedSecretKey);
+      
+      SecretKeyEncryptionStrategy keyEncryptionStrategy = CryptoModuleFactory.getSecretKeyEncryptionStrategy(cryptoOpts
+          .get(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey()));
+      SecretKeyEncryptionStrategyContext keyEncryptionStrategyContext = keyEncryptionStrategy.getNewContext();
+      
+      keyEncryptionStrategyContext.setOpaqueKeyEncryptionKeyID(opaqueId);
+      keyEncryptionStrategyContext.setContext(cryptoOpts);
+      keyEncryptionStrategyContext.setEncryptedSecretKey(encryptedSecretKey);
+      
+      keyEncryptionStrategyContext = keyEncryptionStrategy.decryptSecretKey(keyEncryptionStrategyContext);
+      
+      byte[] secretKey = keyEncryptionStrategyContext.getPlaintextSecretKey();
+      
+      // int secretKeyLength = dataIn.readInt();
+      // byte[] secretKey = new byte[secretKeyLength];
+      // dataIn.read(secretKey, 0, secretKeyLength);
+      
+      Map<CryptoModule.CryptoInitProperty,Object> cryptoInitParams = new HashMap<CryptoModule.CryptoInitProperty,Object>();
+      cryptoInitParams.put(CryptoInitProperty.CIPHER_SUITE, cipherSuiteFromFile);
+      cryptoInitParams.put(CryptoInitProperty.ALGORITHM_NAME, algorithmNameFromFile);
+      cryptoInitParams.put(CryptoInitProperty.PLAINTEXT_SESSION_KEY, secretKey);
+      cryptoInitParams.put(CryptoInitProperty.INITIALIZATION_VECTOR, initVector);
+      
+      InputStream cipherInputStream = getDecryptingInputStream(dataIn, cryptoOpts, cryptoInitParams);
+      return cipherInputStream;
+      
+    } else {
+      // Push these bytes back on to the stream. This method is a bit roundabout but isolates our code
+      // from having to understand the format that DataOuputStream uses for its bytes.
+      ByteArrayOutputStream tempByteOut = new ByteArrayOutputStream();
+      DataOutputStream tempOut = new DataOutputStream(tempByteOut);
+      tempOut.writeUTF(marker);
+      
+      byte[] bytesToPutBack = tempByteOut.toByteArray();
+      
+      PushbackInputStream pushbackStream = new PushbackInputStream(in, bytesToPutBack.length);
+      pushbackStream.unread(bytesToPutBack);
+      
+      return pushbackStream;
+    }
+    
+  }
+  
+  @Override
+  public OutputStream getEncryptingOutputStream(OutputStream out, Map<String,String> conf, Map<CryptoModule.CryptoInitProperty,Object> cryptoInitParams) {
+    
+    log.debug("Initializing crypto output stream");
+    
+    String cipherSuite = conf.get(Property.CRYPTO_CIPHER_SUITE.getKey());
+    
+    if (cipherSuite.equals("NullCipher")) {
+      return out;
+    }
+    
+    String algorithmName = conf.get(Property.CRYPTO_CIPHER_ALGORITHM_NAME.getKey());
+    String secureRNG = conf.get(Property.CRYPTO_SECURE_RNG.getKey());
+    String secureRNGProvider = conf.get(Property.CRYPTO_SECURE_RNG_PROVIDER.getKey());
+    int keyLength = Integer.parseInt(conf.get(Property.CRYPTO_CIPHER_KEY_LENGTH.getKey()));
+    String keyStrategyName = conf.get(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey());
+    
+    log.debug(String.format(
+        "Using cipher suite \"%s\" (algorithm \"%s\") with key length %d with RNG \"%s\" and RNG provider \"%s\" and key encryption strategy %s", cipherSuite,
+        algorithmName, keyLength, secureRNG, secureRNGProvider, keyStrategyName));
+    
+    SecureRandom secureRandom = DefaultCryptoModuleUtils.getSecureRandom(secureRNG, secureRNGProvider);
+    Cipher cipher = DefaultCryptoModuleUtils.getCipher(cipherSuite);
+    byte[] randomKey = (byte[]) cryptoInitParams.get(CryptoInitProperty.PLAINTEXT_SESSION_KEY);
+    byte[] initVector = (byte[]) cryptoInitParams.get(CryptoInitProperty.INITIALIZATION_VECTOR);
+    
+    // If they pass us an IV, use it...
+    if (initVector != null) {
+      
+      try {
+        cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(randomKey, algorithmName), new IvParameterSpec(initVector));
+      } catch (InvalidKeyException e) {
+        log.error("Accumulo encountered an unknown error in generating the secret key object (SecretKeySpec) for an encrypted stream");
+        throw new RuntimeException(e);
+      } catch (InvalidAlgorithmParameterException e) {
+        log.error("Accumulo encountered an unknown error in generating the secret key object (SecretKeySpec) for an encrypted stream");
+        throw new RuntimeException(e);
+      }
+      
+    } else {
+      // We didn't get an IV, so we'll let the cipher make one for us and then put its value back into the map so
+      // that the caller has access to it, to persist it.
+      try {
+        cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(randomKey, algorithmName), secureRandom);
+      } catch (InvalidKeyException e) {
+        log.error("Accumulo encountered an unknown error in generating the secret key object (SecretKeySpec) for the write-ahead log");
+        throw new RuntimeException(e);
+      }
+      
+      // Since the IV length is determined by the algorithm, we let the cipher generate our IV for us,
+      // rather than calling secure random directly.
+      initVector = cipher.getIV();
+      cryptoInitParams.put(CryptoInitProperty.INITIALIZATION_VECTOR, initVector);
+    }
+    
+    CipherOutputStream cipherOutputStream = new CipherOutputStream(out, cipher);
+    BufferedOutputStream bufferedCipherOutputStream = new BufferedOutputStream(cipherOutputStream);
+    
+    return bufferedCipherOutputStream;
+  }
+  
+  @Override
+  public InputStream getDecryptingInputStream(InputStream in, Map<String,String> cryptoOpts, Map<CryptoModule.CryptoInitProperty,Object> cryptoInitParams)
+      throws IOException {
+    String cipherSuite = cryptoOpts.get(Property.CRYPTO_CIPHER_SUITE.getKey());
+    String algorithmName = cryptoOpts.get(Property.CRYPTO_CIPHER_ALGORITHM_NAME.getKey());
+    String cipherSuiteFromInitParams = (String) cryptoInitParams.get(CryptoInitProperty.CIPHER_SUITE);
+    String algorithmNameFromInitParams = (String) cryptoInitParams.get(CryptoInitProperty.ALGORITHM_NAME);
+    byte[] initVector = (byte[]) cryptoInitParams.get(CryptoInitProperty.INITIALIZATION_VECTOR);
+    byte[] secretKey = (byte[]) cryptoInitParams.get(CryptoInitProperty.PLAINTEXT_SESSION_KEY);
+    
+    if (initVector == null || secretKey == null || cipherSuiteFromInitParams == null || algorithmNameFromInitParams == null) {
+      log.error("Called getDecryptingInputStream() without proper crypto init params.  Need initVector, plaintext key, cipher suite and algorithm name");
+      throw new RuntimeException("Called getDecryptingInputStream() without initialization vector and/or plaintext session key");
+    }
+    
+    // Always use the init param's cipher suite, but check it against configured one and warn about discrepencies.
+    if (!cipherSuiteFromInitParams.equals(cipherSuite) || !algorithmNameFromInitParams.equals(algorithmName))
+      log.warn(String.format("Configured cipher suite and algorithm (\"%s\" and \"%s\") is different "
+          + "from cipher suite found in log file (\"%s\" and \"%s\")", cipherSuite, algorithmName, cipherSuiteFromInitParams, algorithmNameFromInitParams));
+    
+    Cipher cipher = DefaultCryptoModuleUtils.getCipher(cipherSuiteFromInitParams);
+    
+    try {
+      cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(secretKey, algorithmNameFromInitParams), new IvParameterSpec(initVector));
+    } catch (InvalidKeyException e) {
+      log.error("Error when trying to initialize cipher with secret key");
+      throw new RuntimeException(e);
+    } catch (InvalidAlgorithmParameterException e) {
+      log.error("Error when trying to initialize cipher with initialization vector");
+      throw new RuntimeException(e);
+    }
+    
+    BufferedInputStream bufferedDecryptingInputStream = new BufferedInputStream(new CipherInputStream(in, cipher));
+    
+    return bufferedDecryptingInputStream;
+    
+  }
+}

Propchange: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModuleUtils.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModuleUtils.java?rev=1438786&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModuleUtils.java (added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModuleUtils.java Sat Jan 26 00:13:36 2013
@@ -0,0 +1,56 @@
+package org.apache.accumulo.core.security.crypto;
+
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.SecureRandom;
+
+import javax.crypto.Cipher;
+import javax.crypto.NoSuchPaddingException;
+import javax.crypto.NullCipher;
+
+import org.apache.log4j.Logger;
+
+public class DefaultCryptoModuleUtils {
+
+  private static final Logger log = Logger.getLogger(DefaultCryptoModuleUtils.class);
+  
+  public static SecureRandom getSecureRandom(String secureRNG, String secureRNGProvider) {
+    SecureRandom secureRandom = null;
+    try {
+      secureRandom = SecureRandom.getInstance(secureRNG, secureRNGProvider);
+      
+      // Immediately seed the generator
+      byte[] throwAway = new byte[16];
+      secureRandom.nextBytes(throwAway);
+      
+    } catch (NoSuchAlgorithmException e) {
+      log.error(String.format("Accumulo configuration file specified a secure random generator \"%s\" that was not found by any provider.", secureRNG));
+      throw new RuntimeException(e);
+    } catch (NoSuchProviderException e) {
+      log.error(String.format("Accumulo configuration file specified a secure random provider \"%s\" that does not exist", secureRNGProvider));
+      throw new RuntimeException(e);
+    }
+    return secureRandom;
+  }
+
+  public static Cipher getCipher(String cipherSuite) {
+    Cipher cipher = null;
+    
+    if (cipherSuite.equals("NullCipher")) {
+      cipher = new NullCipher();
+    } else {
+      try {
+        cipher = Cipher.getInstance(cipherSuite);
+      } catch (NoSuchAlgorithmException e) {
+        log.error(String.format("Accumulo configuration file contained a cipher suite \"%s\" that was not recognized by any providers", cipherSuite));
+        throw new RuntimeException(e);
+      } catch (NoSuchPaddingException e) {
+        log.error(String.format("Accumulo configuration file contained a cipher, \"%s\" with a padding that was not recognized by any providers"));
+        throw new RuntimeException(e);
+      }
+    }
+    return cipher;
+  }
+  
+  
+}

Propchange: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModuleUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultSecretKeyEncryptionStrategy.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultSecretKeyEncryptionStrategy.java?rev=1438786&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultSecretKeyEncryptionStrategy.java (added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultSecretKeyEncryptionStrategy.java Sat Jan 26 00:13:36 2013
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.security.crypto;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+public class DefaultSecretKeyEncryptionStrategy implements SecretKeyEncryptionStrategy {
+  
+  private static final Logger log = Logger.getLogger(DefaultSecretKeyEncryptionStrategy.class); 
+  
+  public static class DefaultSecretKeyEncryptionStrategyContext implements SecretKeyEncryptionStrategyContext {
+
+    private byte[] plaintextSecretKey;
+    private byte[] encryptedSecretKey;
+    private Map<String, String> context;
+    private String opaqueKeyId;
+    
+    @Override
+    public String getOpaqueKeyEncryptionKeyID() {
+      return opaqueKeyId;
+    }
+
+    @Override
+    public void setOpaqueKeyEncryptionKeyID(String id) {
+      this.opaqueKeyId = id;
+    }
+
+    @Override
+    public byte[] getPlaintextSecretKey() {
+      return plaintextSecretKey;
+    }
+
+    @Override
+    public void setPlaintextSecretKey(byte[] key) {
+      this.plaintextSecretKey = key;
+    }
+
+    @Override
+    public byte[] getEncryptedSecretKey() {
+      return encryptedSecretKey;
+    }
+
+    @Override
+    public void setEncryptedSecretKey(byte[] key) {
+      this.encryptedSecretKey = key;
+    }
+
+    @Override
+    public Map<String,String> getContext() {
+      return context;
+    }
+
+    @Override
+    public void setContext(Map<String,String> context) {
+      this.context = context;
+    }
+  }
+  
+  
+  @Override
+  public SecretKeyEncryptionStrategyContext encryptSecretKey(SecretKeyEncryptionStrategyContext context)  {
+    String hdfsURI = context.getContext().get(Property.CRYPTO_DEFAULT_KEY_STRATEGY_HDFS_URI.getKey());
+    String pathToKeyName = context.getContext().get(Property.CRYPTO_DEFAULT_KEY_STRATEGY_KEY_LOCATION.getKey());
+    Path pathToKey = new Path(pathToKeyName);
+    
+    FileSystem fs = getHadoopFileSystem(hdfsURI);
+    try {
+      
+      doKeyEncryptionOperation(Cipher.ENCRYPT_MODE, context, pathToKeyName, pathToKey, fs);
+      
+    } catch (IOException e) {
+      log.error(e);
+      throw new RuntimeException(e);
+    }
+  
+    return context;
+    
+  }
+
+  private void initializeKeyEncryptingKey(FileSystem fs, Path pathToKey, SecretKeyEncryptionStrategyContext context) throws IOException {
+    Map<String, String> cryptoContext = context.getContext(); 
+    DataOutputStream out = fs.create(pathToKey);
+    // Very important, lets hedge our bets
+    fs.setReplication(pathToKey, (short) 5);
+    
+    // Write number of context entries
+    out.writeInt(cryptoContext.size());
+    
+    for (String key : cryptoContext.keySet()) {
+      out.writeUTF(key);
+      out.writeUTF(cryptoContext.get(key));
+    }
+    
+    SecureRandom random = DefaultCryptoModuleUtils.getSecureRandom(cryptoContext.get(Property.CRYPTO_SECURE_RNG.getKey()), cryptoContext.get(Property.CRYPTO_SECURE_RNG_PROVIDER.getKey()));
+    int keyLength = Integer.parseInt(cryptoContext.get(Property.CRYPTO_CIPHER_KEY_LENGTH.getKey()));
+    byte[] newRandomKeyEncryptionKey = new byte[keyLength / 8];
+    
+    random.nextBytes(newRandomKeyEncryptionKey);
+    
+    Cipher cipher = DefaultCryptoModuleUtils.getCipher(cryptoContext.get(Property.CRYPTO_CIPHER_SUITE.getKey()));
+    try {
+      cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(newRandomKeyEncryptionKey, cryptoContext.get(Property.CRYPTO_CIPHER_ALGORITHM_NAME.getKey())), random);
+    } catch (InvalidKeyException e) {
+      log.error(e);
+      throw new RuntimeException(e);
+    }
+    
+    byte[] initVector = cipher.getIV();
+    
+    out.writeInt(initVector.length);
+    out.write(initVector);
+    
+    out.writeInt(newRandomKeyEncryptionKey.length);
+    out.write(newRandomKeyEncryptionKey);
+    
+    out.flush();
+    out.close();
+    
+  }
+
+  private FileSystem getHadoopFileSystem(String hdfsURI) {
+    FileSystem fs = null;
+    
+    if (hdfsURI != null && !hdfsURI.equals("")) {
+      try {
+        fs = FileSystem.get(CachedConfiguration.getInstance());
+      } catch (IOException e) {
+        log.error(e);
+        throw new RuntimeException(e);
+      }
+    }
+    else {
+      try {
+        fs = FileSystem.get(new URI(hdfsURI), CachedConfiguration.getInstance());
+      } catch (URISyntaxException e) {
+        log.error(e);
+        throw new RuntimeException(e);
+      } catch (IOException e) {
+        log.error(e);
+        throw new RuntimeException(e);
+      }
+      
+      
+    }
+    return fs;
+  }
+  
+  @Override
+  public SecretKeyEncryptionStrategyContext decryptSecretKey(SecretKeyEncryptionStrategyContext context) {
+    String hdfsURI = context.getContext().get(Property.CRYPTO_DEFAULT_KEY_STRATEGY_HDFS_URI.getKey());
+    String pathToKeyName = context.getContext().get(Property.CRYPTO_DEFAULT_KEY_STRATEGY_KEY_LOCATION.getKey());
+    Path pathToKey = new Path(pathToKeyName);
+    
+    FileSystem fs = getHadoopFileSystem(hdfsURI);
+    try {
+      doKeyEncryptionOperation(Cipher.DECRYPT_MODE, context, pathToKeyName, pathToKey, fs);
+      
+      
+    } catch (IOException e) {
+      log.error(e);
+      throw new RuntimeException(e);
+    }
+    
+    return context;
+  }
+
+  private void doKeyEncryptionOperation(int encryptionMode, SecretKeyEncryptionStrategyContext context, String pathToKeyName, Path pathToKey, FileSystem fs)
+      throws IOException {
+    DataInputStream in = null;
+    try {
+      if (!fs.exists(pathToKey)) {
+        
+        if (encryptionMode == Cipher.DECRYPT_MODE) {
+          log.error("There was a call to decrypt the session key but no key encryption key exists.  Either restore it, reconfigure the conf file to point to it in HDFS, or throw the affected data away and begin again.");
+          throw new RuntimeException("Could not find key encryption key file in configured location in HDFS ("+pathToKeyName+")");
+        } else {
+          initializeKeyEncryptingKey(fs, pathToKey, context);
+        }
+      }
+      in = fs.open(pathToKey);
+      
+      int numOfOpts = in.readInt();
+      Map<String, String> optsFromFile = new HashMap<String, String>();
+      
+      for (int i = 0; i < numOfOpts; i++) {
+        String key = in.readUTF();
+        String value = in.readUTF();
+        
+        optsFromFile.put(key, value);
+      }
+      
+      int ivLength = in.readInt();
+      byte[] iv = new byte[ivLength];
+      in.read(iv);
+      
+      
+      int keyEncryptionKeyLength = in.readInt();
+      byte[] keyEncryptionKey = new byte[keyEncryptionKeyLength];
+      in.read(keyEncryptionKey);
+      
+      Cipher cipher = DefaultCryptoModuleUtils.getCipher(optsFromFile.get(Property.CRYPTO_CIPHER_SUITE.getKey()));
+
+      try {
+        cipher.init(encryptionMode, new SecretKeySpec(keyEncryptionKey, optsFromFile.get(Property.CRYPTO_CIPHER_ALGORITHM_NAME.getKey())), new IvParameterSpec(iv));
+      } catch (InvalidKeyException e) {
+        log.error(e);
+        throw new RuntimeException(e);
+      } catch (InvalidAlgorithmParameterException e) {
+        log.error(e);
+        throw new RuntimeException(e);
+      }
+
+      ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+      CipherOutputStream cipherStream = new CipherOutputStream(byteArrayOutputStream, cipher);
+      
+      
+      if (Cipher.DECRYPT_MODE == encryptionMode) {
+        cipherStream.write(context.getEncryptedSecretKey());
+        cipherStream.flush();        
+        byte[] plaintextSecretKey = byteArrayOutputStream.toByteArray();
+
+        cipherStream.close();
+        
+        context.setPlaintextSecretKey(plaintextSecretKey);
+      } else {
+        cipherStream.write(context.getPlaintextSecretKey());
+        cipherStream.flush();        
+        byte[] encryptedSecretKey = byteArrayOutputStream.toByteArray();
+
+        cipherStream.close();
+        
+        context.setEncryptedSecretKey(encryptedSecretKey);
+        context.setOpaqueKeyEncryptionKeyID(pathToKeyName);
+      }
+      
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+    }
+  }
+
+  @Override
+  public SecretKeyEncryptionStrategyContext getNewContext() {
+    return new DefaultSecretKeyEncryptionStrategyContext();
+  }
+  
+}

Propchange: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultSecretKeyEncryptionStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategy.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategy.java?rev=1438786&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategy.java (added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategy.java Sat Jan 26 00:13:36 2013
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.security.crypto;
+
+/**
+ * 
+ */
+public interface SecretKeyEncryptionStrategy {
+  
+  public SecretKeyEncryptionStrategyContext encryptSecretKey(SecretKeyEncryptionStrategyContext context);
+  public SecretKeyEncryptionStrategyContext decryptSecretKey(SecretKeyEncryptionStrategyContext context);
+  public SecretKeyEncryptionStrategyContext getNewContext();
+  
+  
+}

Propchange: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategyContext.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategyContext.java?rev=1438786&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategyContext.java (added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategyContext.java Sat Jan 26 00:13:36 2013
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.security.crypto;
+
+import java.util.Map;
+
+public interface SecretKeyEncryptionStrategyContext {
+  public String getOpaqueKeyEncryptionKeyID();
+  public void setOpaqueKeyEncryptionKeyID(String id);
+  public byte[] getPlaintextSecretKey();
+  public void setPlaintextSecretKey(byte[] key);
+  public byte[] getEncryptedSecretKey();
+  public void setEncryptedSecretKey(byte[] key);
+  public Map<String, String> getContext();
+  public void setContext(Map<String, String> context);
+}

Propchange: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/security/crypto/SecretKeyEncryptionStrategyContext.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1438786&r1=1438785&r2=1438786&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Sat Jan 26 00:13:36 2013
@@ -22,11 +22,14 @@ import static org.apache.accumulo.server
 import static org.apache.accumulo.server.logger.LogEvents.MANY_MUTATIONS;
 import static org.apache.accumulo.server.logger.LogEvents.OPEN;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -37,6 +40,8 @@ import org.apache.accumulo.core.conf.Acc
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.crypto.CryptoModule;
+import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.StringUtil;
 import org.apache.accumulo.server.logger.LogFileKey;
@@ -53,16 +58,19 @@ import org.apache.log4j.Logger;
  * 
  */
 public class DfsLogger {
+  // Package private so that LogSorter can find this
+  static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
+  
   private static Logger log = Logger.getLogger(DfsLogger.class);
   
   public static class LogClosedException extends IOException {
     private static final long serialVersionUID = 1L;
-
+    
     public LogClosedException() {
       super("LogClosed");
     }
   }
-
+  
   public interface ServerResources {
     AccumuloConfiguration getConfiguration();
     
@@ -70,7 +78,7 @@ public class DfsLogger {
     
     Set<TServerInstance> getCurrentTServers();
   }
-
+  
   private LinkedBlockingQueue<DfsLogger.LogWork> workQueue = new LinkedBlockingQueue<DfsLogger.LogWork>();
   
   private final Object closeLock = new Object();
@@ -80,9 +88,9 @@ public class DfsLogger {
   private static final LogFileValue EMPTY = new LogFileValue();
   
   private boolean closed = false;
-
+  
   private class LogSyncingTask implements Runnable {
-
+    
     @Override
     public void run() {
       ArrayList<DfsLogger.LogWork> work = new ArrayList<DfsLogger.LogWork>();
@@ -129,7 +137,7 @@ public class DfsLogger {
       }
     }
   }
-
+  
   static class LogWork {
     List<TabletMutations> mutations;
     CountDownLatch latch;
@@ -140,7 +148,7 @@ public class DfsLogger {
       this.latch = latch;
     }
   }
-
+  
   public static class LoggerOperation {
     private LogWork work;
     
@@ -165,8 +173,10 @@ public class DfsLogger {
       }
     }
   }
-
-  /* (non-Javadoc)
+  
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#equals(java.lang.Object)
    */
   @Override
@@ -179,7 +189,9 @@ public class DfsLogger {
     return false;
   }
   
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#hashCode()
    */
   @Override
@@ -190,6 +202,7 @@ public class DfsLogger {
   
   private ServerResources conf;
   private FSDataOutputStream logFile;
+  private DataOutputStream encryptingLogFile = null;
   private Path logPath;
   private String logger;
   
@@ -202,11 +215,13 @@ public class DfsLogger {
     this.logger = logger;
     this.logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()), filename);
   }
-
+  
   public synchronized void open(String address) throws IOException {
     String filename = UUID.randomUUID().toString();
     logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
-
+    
+    log.debug("DfsLogger.open() begin");
+    
     logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()) + "/" + logger + "/" + filename);
     try {
       FileSystem fs = conf.getFileSystem();
@@ -220,6 +235,32 @@ public class DfsLogger {
       blockSize -= blockSize % checkSum;
       blockSize = Math.max(blockSize, checkSum);
       logFile = fs.create(logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+      
+      // Initialize the crypto operations.
+      @SuppressWarnings("deprecation")
+      CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
+      
+      // Initialize the log file with a header and the crypto params used to set up this log file.
+      logFile.writeUTF(LOG_FILE_HEADER_V2);
+      Map<String,String> cryptoOpts = conf.getConfiguration().getAllPropertiesWithPrefix(Property.CRYPTO_PREFIX);
+      
+      logFile.writeInt(cryptoOpts.size());
+      for (String key : cryptoOpts.keySet()) {
+        logFile.writeUTF(key);
+        logFile.writeUTF(cryptoOpts.get(key));
+      }
+      
+      @SuppressWarnings("deprecation")
+      OutputStream encipheringOutputStream = cryptoModule.getEncryptingOutputStream(logFile, cryptoOpts);
+      
+      // If the module just kicks back our original stream, then just use it, don't wrap it in
+      // another data OutputStream.
+      if (encipheringOutputStream == logFile) {
+        encryptingLogFile = logFile;
+      } else {
+        encryptingLogFile = new DataOutputStream(encipheringOutputStream);
+      }
+      
       LogFileKey key = new LogFileKey();
       key.event = OPEN;
       key.tserverSession = filename;
@@ -238,7 +279,9 @@ public class DfsLogger {
     t.start();
   }
   
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#toString()
    */
   @Override
@@ -273,7 +316,7 @@ public class DfsLogger {
           log.info("Interrupted");
         }
     }
-
+    
     if (logFile != null)
       try {
         logFile.close();
@@ -305,10 +348,10 @@ public class DfsLogger {
    * @throws IOException
    */
   private synchronized void write(LogFileKey key, LogFileValue value) throws IOException {
-    key.write(logFile);
-    value.write(logFile);
+    key.write(encryptingLogFile);
+    value.write(encryptingLogFile);
   }
-
+  
   public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException {
     return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation))));
   }
@@ -332,16 +375,16 @@ public class DfsLogger {
         work.exception = e;
       }
     }
-
+    
     synchronized (closeLock) {
       // use a different lock for close check so that adding to work queue does not need
       // to wait on walog I/O operations
-
+      
       if (closed)
         throw new LogClosedException();
       workQueue.add(work);
     }
-
+    
     return new LoggerOperation(work);
   }
   

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1438786&r1=1438785&r2=1438786&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Sat Jan 26 00:13:36 2013
@@ -16,8 +16,10 @@
  */
 package org.apache.accumulo.server.tabletserver.log;
 
+import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -32,6 +34,8 @@ import org.apache.accumulo.core.client.I
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.master.thrift.RecoveryStatus;
+import org.apache.accumulo.core.security.crypto.CryptoModule;
+import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
@@ -51,16 +55,16 @@ import org.apache.zookeeper.KeeperExcept
  */
 public class LogSorter {
   
-
   private static final Logger log = Logger.getLogger(LogSorter.class);
   FileSystem fs;
   AccumuloConfiguration conf;
   
   private final Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<String,LogProcessor>());
-
+  
   class LogProcessor implements Processor {
     
     private FSDataInputStream input;
+    private DataInputStream decryptingInput;
     private long bytesCopied = -1;
     private long sortStart = 0;
     private long sortStop = -1;
@@ -92,23 +96,67 @@ public class LogSorter {
     }
     
     public void sort(String name, Path srcPath, String destPath) {
-
+      
       synchronized (this) {
         sortStart = System.currentTimeMillis();
       }
-
+      
       String formerThreadName = Thread.currentThread().getName();
       int part = 0;
       try {
         
         // the following call does not throw an exception if the file/dir does not exist
         fs.delete(new Path(destPath), true);
-
+        
         FSDataInputStream tmpInput = fs.open(srcPath);
-        synchronized (this) {
-          this.input = tmpInput;
+        DataInputStream tmpDecryptingInput = tmpInput;
+        
+        String logHeader = tmpInput.readUTF();
+        Map<String,String> cryptoOpts = new HashMap<String,String>();
+        
+        if (!logHeader.equals(DfsLogger.LOG_FILE_HEADER_V2)) {
+          
+          log.debug("Not a V2 log file, so re-opening it and passing it on");
+          
+          // Hmmm, this isn't the log file I was expecting, so close it and reopen to unread those bytes.
+          tmpInput.close();
+          tmpInput = fs.open(srcPath);
+          
+          synchronized (this) {
+            this.input = tmpInput;
+            this.decryptingInput = tmpInput;
+          }
+          
+        } else {
+          
+          int numEntries = tmpInput.readInt();
+          for (int i = 0; i < numEntries; i++) {
+            cryptoOpts.put(tmpInput.readUTF(), tmpInput.readUTF());
+          }
+          
+          String cryptoModuleName = cryptoOpts.get(Property.CRYPTO_MODULE_CLASS.getKey());
+          if (cryptoModuleName == null) {
+            // If for whatever reason we didn't get a configured crypto module (old log file version, for instance)
+            // default to using the default configuration entry (usually NullCipher).
+            cryptoModuleName = AccumuloConfiguration.getDefaultConfiguration().get(Property.CRYPTO_MODULE_CLASS);
+          }
+          
+          synchronized (this) {
+            this.input = tmpInput;
+          }
+          
+          @SuppressWarnings("deprecation")
+          CryptoModule cryptoOps = CryptoModuleFactory.getCryptoModule(cryptoModuleName);
+          @SuppressWarnings("deprecation")
+          InputStream decryptingInputStream = cryptoOps.getDecryptingInputStream(input, cryptoOpts);
+          
+          tmpDecryptingInput = new DataInputStream(decryptingInputStream);
+          
+          synchronized (this) {
+            this.decryptingInput = tmpDecryptingInput;
+          }
         }
-
+        
         final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
         Thread.currentThread().setName("Sorting " + name + " for recovery");
         while (true) {
@@ -118,8 +166,8 @@ public class LogSorter {
             while (input.getPos() - start < bufferSize) {
               LogFileKey key = new LogFileKey();
               LogFileValue value = new LogFileValue();
-              key.readFields(input);
-              value.readFields(input);
+              key.readFields(decryptingInput);
+              value.readFields(decryptingInput);
               buffer.add(new Pair<LogFileKey,LogFileValue>(key, value));
             }
             writeBuffer(destPath, buffer, part++);
@@ -170,10 +218,11 @@ public class LogSorter {
         output.close();
       }
     }
-
+    
     synchronized void close() throws IOException {
       bytesCopied = input.getPos();
       input.close();
+      decryptingInput.close();
       input = null;
     }
     
@@ -201,7 +250,7 @@ public class LogSorter {
     int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
     this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
   }
-
+  
   public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException {
     this.threadPool = distWorkQThreadPool;
     new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZRECOVERY).startProcessing(new LogProcessor(), this.threadPool);



Mime
View raw message