accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1339223 - /accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
Date Wed, 16 May 2012 15:20:12 GMT
Author: vines
Date: Wed May 16 15:20:11 2012
New Revision: 1339223

URL: http://svn.apache.org/viewvc?rev=1339223&view=rev
Log:
ACCUMULO-489 - extending password fix to the OutputFormat


Modified:
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java

Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1339223&r1=1339222&r2=1339223&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
(original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
Wed May 16 15:20:11 2012
@@ -38,6 +38,13 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -68,7 +75,7 @@ public class AccumuloOutputFormat extend
   private static final String OUTPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
   private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
   private static final String USERNAME = PREFIX + ".username";
-  private static final String PASSWORD = PREFIX + ".password";
+  private static final String PASSWORD_PATH = PREFIX + ".password";
   private static final String DEFAULT_TABLE_NAME = PREFIX + ".defaulttable";
   
   private static final String INSTANCE_NAME = PREFIX + ".instanceName";
@@ -101,9 +108,11 @@ public class AccumuloOutputFormat extend
    *          the output format will create new tables as necessary. Table names can only
be alpha-numeric and underscores.
    * @param defaultTable
    *          the table to use when the tablename is null in the write call
+   * @throws IOException
+   *           Thrown when there are issues creating the file distributed with the users
info
    * @deprecated Use {@link #setOutputInfo(Configuration,String,byte[],boolean,String)} instead
    */
-  public static void setOutputInfo(JobContext job, String user, byte[] passwd, boolean createTables,
String defaultTable) {
+  public static void setOutputInfo(JobContext job, String user, byte[] passwd, boolean createTables,
String defaultTable) throws IOException {
     setOutputInfo(job.getConfiguration(), user, passwd, createTables, defaultTable);
   }
   
@@ -120,18 +129,34 @@ public class AccumuloOutputFormat extend
    *          the output format will create new tables as necessary. Table names can only
be alpha-numeric and underscores.
    * @param defaultTable
    *          the table to use when the tablename is null in the write call
+   * @throws IOException
+   *           Thrown when there are issues creating the file distributed with the users
info
    */
-  public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean
createTables, String defaultTable) {
+  public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean
createTables, String defaultTable) throws IOException {
     if (conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
       throw new IllegalStateException("Output info can only be set once per job");
     conf.setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true);
     
     ArgumentChecker.notNull(user, passwd);
     conf.set(USERNAME, user);
-    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
     conf.setBoolean(CREATETABLES, createTables);
     if (defaultTable != null)
       conf.set(DEFAULT_TABLE_NAME, defaultTable);
+    
+    FileSystem fs = FileSystem.get(conf);
+    Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis()
+ ".pw");
+    conf.set(PASSWORD_PATH, file.toString());
+    FSDataOutputStream fos = fs.create(file, false);
+    fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+    fs.deleteOnExit(file);
+    
+    byte[] encodedPw = Base64.encodeBase64(passwd);
+    fos.writeInt(encodedPw.length);
+    fos.write(encodedPw);
+    fos.close();
+    
+    DistributedCache.addCacheFile(file.toUri(), conf);
+
   }
   
   /**
@@ -232,21 +257,28 @@ public class AccumuloOutputFormat extend
   }
   
   /**
-   * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks;
It is BASE64 encoded to provide a charset safe conversion to a
-   * string, and is not intended to be secure.
+   * @throws IOException
    * 
    * @deprecated Use {@link #getPassword(Configuration)} instead
    */
-  protected static byte[] getPassword(JobContext job) {
+  protected static byte[] getPassword(JobContext job) throws IOException {
     return getPassword(job.getConfiguration());
   }
   
   /**
-   * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks;
It is BASE64 encoded to provide a charset safe conversion to a
-   * string, and is not intended to be secure.
+   * @throws IOException
    */
-  protected static byte[] getPassword(Configuration conf) {
-    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
+  protected static byte[] getPassword(Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    Path file = new Path(conf.get(PASSWORD_PATH));
+    
+    FSDataInputStream fdis = fs.open(file);
+    int length = fdis.readInt();
+    byte[] encodedPassword = new byte[length];
+    fdis.read(encodedPassword);
+    fdis.close();
+    
+    return Base64.decodeBase64(encodedPassword);
   }
   
   /**
@@ -354,7 +386,7 @@ public class AccumuloOutputFormat extend
     
     private Connector conn;
     
-    AccumuloRecordWriter(TaskAttemptContext attempt) throws AccumuloException, AccumuloSecurityException
{
+    AccumuloRecordWriter(TaskAttemptContext attempt) throws AccumuloException, AccumuloSecurityException,
IOException {
       Level l = getLogLevel(attempt);
       if (l != null)
         log.setLevel(getLogLevel(attempt));



Mime
View raw message