accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1336322 - /accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
Date Wed, 09 May 2012 18:08:29 GMT
Author: vines
Date: Wed May  9 18:08:29 2012
New Revision: 1336322

URL: http://svn.apache.org/viewvc?rev=1336322&view=rev
Log:
ACCUMULO-489 - We now write the password to a read-restricted file, distribute with the private
distributed cache (new as of 20.203.0), and then read it distributively. This prevents the
password from sitting in plaintext because we let file system permissions hide the data..

Modified:
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java

Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1336322&r1=1336321&r2=1336322&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
(original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
Wed May  9 18:08:29 2012
@@ -81,6 +81,13 @@ import org.apache.accumulo.core.util.Tex
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -115,7 +122,7 @@ public abstract class InputFormatBase<K,
   private static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
   private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
   private static final String USERNAME = PREFIX + ".username";
-  private static final String PASSWORD = PREFIX + ".password";
+  private static final String PASSWORD_PATH = PREFIX + ".password";
   private static final String TABLE_NAME = PREFIX + ".tablename";
   private static final String AUTHORIZATIONS = PREFIX + ".authorizations";
   
@@ -187,9 +194,10 @@ public abstract class InputFormatBase<K,
   }
   
   /**
+   * @throws IOException
    * @deprecated Use {@link #setInputInfo(Configuration,String,byte[],String,Authorizations)}
instead
    */
-  public static void setInputInfo(JobContext job, String user, byte[] passwd, String table,
Authorizations auths) {
+  public static void setInputInfo(JobContext job, String user, byte[] passwd, String table,
Authorizations auths) throws IOException {
     setInputInfo(job.getConfiguration(), user, passwd, table, auths);
   }
   
@@ -206,18 +214,32 @@ public abstract class InputFormatBase<K,
    *          the table to read
    * @param auths
    *          the authorizations used to restrict data read
+   * @throws IOException
    */
-  public static void setInputInfo(Configuration conf, String user, byte[] passwd, String
table, Authorizations auths) {
+  public static void setInputInfo(Configuration conf, String user, byte[] passwd, String
table, Authorizations auths) throws IOException {
     if (conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
       throw new IllegalStateException("Input info can only be set once per job");
     conf.setBoolean(INPUT_INFO_HAS_BEEN_SET, true);
     
     ArgumentChecker.notNull(user, passwd, table);
     conf.set(USERNAME, user);
-    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
     conf.set(TABLE_NAME, table);
     if (auths != null && !auths.isEmpty())
       conf.set(AUTHORIZATIONS, auths.serialize());
+    
+    FileSystem fs = FileSystem.get(conf);
+    Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis()
+ ".pw");
+    conf.set(PASSWORD_PATH, file.toString());
+    FSDataOutputStream fos = fs.create(file, false);
+    fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+    fs.deleteOnExit(file);
+
+    byte[] encodedPw = Base64.encodeBase64(passwd);
+    fos.writeInt(encodedPw.length);
+    fos.write(encodedPw);
+    fos.close();
+    
+    DistributedCache.addCacheFile(file.toUri(), conf);
   }
   
   /**
@@ -632,26 +654,31 @@ public abstract class InputFormatBase<K,
   }
   
   /**
-   * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks;
It is BASE64 encoded to provide a charset safe conversion to a
-   * string, and is not intended to be secure.
-   * 
+   * @throws IOException
    * @deprecated Use {@link #getPassword(Configuration)} instead
    */
-  protected static byte[] getPassword(JobContext job) {
+  protected static byte[] getPassword(JobContext job) throws IOException {
     return getPassword(job.getConfiguration());
   }
   
   /**
-   * Gets the password from the configuration. WARNING: The password is stored in the Configuration
and shared with all MapReduce tasks; It is BASE64 encoded to
-   * provide a charset safe conversion to a string, and is not intended to be secure.
-   * 
    * @param conf
    *          the Hadoop configuration object
    * @return the BASE64-encoded password
+   * @throws IOException
    * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
    */
-  protected static byte[] getPassword(Configuration conf) {
-    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
+  protected static byte[] getPassword(Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    Path file = new Path(conf.get(PASSWORD_PATH));
+    
+    FSDataInputStream fdis = fs.open(file);
+    int length = fdis.readInt();
+    byte[] encodedPassword = new byte[length];
+    fdis.read(encodedPassword);
+    fdis.close();
+    
+    return Base64.decodeBase64(encodedPassword);
   }
   
   /**
@@ -718,7 +745,7 @@ public abstract class InputFormatBase<K,
   /**
    * @deprecated Use {@link #getTabletLocator(Configuration)} instead
    */
-  protected static TabletLocator getTabletLocator(JobContext job) throws TableNotFoundException
{
+  protected static TabletLocator getTabletLocator(JobContext job) throws TableNotFoundException,
IOException {
     return getTabletLocator(job.getConfiguration());
   }
   
@@ -730,8 +757,10 @@ public abstract class InputFormatBase<K,
    * @return an accumulo tablet locator
    * @throws TableNotFoundException
    *           if the table name set on the configuration doesn't exist
+   * @throws IOException
+   *           if the input format is unable to read the password file from the FileSystem
    */
-  protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException
{
+  protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException,
IOException {
     if (conf.getBoolean(MOCK, false))
       return new MockTabletLocator();
     Instance instance = getInstance(conf);
@@ -1183,7 +1212,7 @@ public abstract class InputFormatBase<K,
   }
   
   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext job,
String tableName, List<Range> ranges) throws TableNotFoundException,
-      AccumuloException, AccumuloSecurityException {
+      AccumuloException, AccumuloSecurityException, IOException {
     
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
 



Mime
View raw message