accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1336348 - in /accumulo/trunk: ./ core/ core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java server/ src/
Date Wed, 09 May 2012 18:48:24 GMT
Author: vines
Date: Wed May  9 18:48:23 2012
New Revision: 1336348

URL: http://svn.apache.org/viewvc?rev=1336348&view=rev
Log:
Merging ACCUMULO-489


Modified:
    accumulo/trunk/   (props changed)
    accumulo/trunk/core/   (props changed)
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
    accumulo/trunk/server/   (props changed)
    accumulo/trunk/src/   (props changed)

Propchange: accumulo/trunk/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src:r1335109-1336328
  Merged /accumulo/branches/1.4:r1335109-1336328

Propchange: accumulo/trunk/core/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/core:r1335109-1336328
  Merged /accumulo/branches/1.4/src/core:r1335109-1336328

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1336348&r1=1336347&r2=1336348&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
Wed May  9 18:48:23 2012
@@ -78,6 +78,13 @@ import org.apache.accumulo.core.util.Tex
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -112,7 +119,7 @@ public abstract class InputFormatBase<K,
   private static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
   private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
   private static final String USERNAME = PREFIX + ".username";
-  private static final String PASSWORD = PREFIX + ".password";
+  private static final String PASSWORD_PATH = PREFIX + ".password";
   private static final String TABLE_NAME = PREFIX + ".tablename";
   private static final String AUTHORIZATIONS = PREFIX + ".authorizations";
   
@@ -177,18 +184,32 @@ public abstract class InputFormatBase<K,
    *          the table to read
    * @param auths
    *          the authorizations used to restrict data read
+   * @throws IOException
    */
-  public static void setInputInfo(Configuration conf, String user, byte[] passwd, String
table, Authorizations auths) {
+  public static void setInputInfo(Configuration conf, String user, byte[] passwd, String
table, Authorizations auths) throws IOException {
     if (conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false))
       throw new IllegalStateException("Input info can only be set once per job");
     conf.setBoolean(INPUT_INFO_HAS_BEEN_SET, true);
     
     ArgumentChecker.notNull(user, passwd, table);
     conf.set(USERNAME, user);
-    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
     conf.set(TABLE_NAME, table);
     if (auths != null && !auths.isEmpty())
       conf.set(AUTHORIZATIONS, auths.serialize());
+    
+    FileSystem fs = FileSystem.get(conf);
+    Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis()
+ ".pw");
+    conf.set(PASSWORD_PATH, file.toString());
+    FSDataOutputStream fos = fs.create(file, false);
+    fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+    fs.deleteOnExit(file);
+
+    byte[] encodedPw = Base64.encodeBase64(passwd);
+    fos.writeInt(encodedPw.length);
+    fos.write(encodedPw);
+    fos.close();
+    
+    DistributedCache.addCacheFile(file.toUri(), conf);
   }
   
   /**
@@ -422,16 +443,25 @@ public abstract class InputFormatBase<K,
   }
   
   /**
-   * Gets the password from the configuration. WARNING: The password is stored in the Configuration
and shared with all MapReduce tasks; It is BASE64 encoded to
-   * provide a charset safe conversion to a string, and is not intended to be secure.
+   * Gets the password from the configuration.
    * 
    * @param conf
    *          the Hadoop configuration object
    * @return the BASE64-encoded password
+   * @throws IOException
    * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
    */
-  protected static byte[] getPassword(Configuration conf) {
-    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
+  protected static byte[] getPassword(Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    Path file = new Path(conf.get(PASSWORD_PATH));
+    
+    FSDataInputStream fdis = fs.open(file);
+    int length = fdis.readInt();
+    byte[] encodedPassword = new byte[length];
+    fdis.read(encodedPassword);
+    fdis.close();
+    
+    return Base64.decodeBase64(encodedPassword);
   }
   
   /**
@@ -482,8 +512,10 @@ public abstract class InputFormatBase<K,
    * @return an accumulo tablet locator
    * @throws TableNotFoundException
    *           if the table name set on the configuration doesn't exist
+   * @throws IOException
+   *           if the input format is unable to read the password file from the FileSystem
    */
-  protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException
{
+  protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException,
IOException {
     if (conf.getBoolean(MOCK, false))
       return new MockTabletLocator();
     Instance instance = getInstance(conf);
@@ -797,7 +829,7 @@ public abstract class InputFormatBase<K,
   }
   
   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(Configuration
conf, String tableName, List<Range> ranges) throws TableNotFoundException,
-      AccumuloException, AccumuloSecurityException {
+      AccumuloException, AccumuloSecurityException, IOException {
     
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     

Propchange: accumulo/trunk/server/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/server:r1335109-1336328
  Merged /accumulo/branches/1.4/src/server:r1335109-1336328

Propchange: accumulo/trunk/src/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src:r1335109-1336328
  Merged /accumulo/branches/1.4/src/src:r1335109-1336328



Mime
View raw message