accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1339230 - in /accumulo/trunk: ./ core/ core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java server/ src/
Date Wed, 16 May 2012 15:28:29 GMT
Author: vines
Date: Wed May 16 15:28:28 2012
New Revision: 1339230

URL: http://svn.apache.org/viewvc?rev=1339230&view=rev
Log:
ACCUMULO-489 - merging


Modified:
    accumulo/trunk/   (props changed)
    accumulo/trunk/core/   (props changed)
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
    accumulo/trunk/server/   (props changed)
    accumulo/trunk/src/   (props changed)

Propchange: accumulo/trunk/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src:r1336821-1339223
  Merged /accumulo/branches/1.4:r1339220-1339223

Propchange: accumulo/trunk/core/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/core:r1339220-1339223
  Merged /accumulo/branches/1.4/src/core:r1336821-1339223

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1339230&r1=1339229&r2=1339230&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
Wed May 16 15:28:28 2012
@@ -38,6 +38,13 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -68,7 +75,7 @@ public class AccumuloOutputFormat extend
   private static final String OUTPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
   private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
   private static final String USERNAME = PREFIX + ".username";
-  private static final String PASSWORD = PREFIX + ".password";
+  private static final String PASSWORD_PATH = PREFIX + ".password";
   private static final String DEFAULT_TABLE_NAME = PREFIX + ".defaulttable";
   
   private static final String INSTANCE_NAME = PREFIX + ".instanceName";
@@ -101,18 +108,34 @@ public class AccumuloOutputFormat extend
    *          the output format will create new tables as necessary. Table names can only
be alpha-numeric and underscores.
    * @param defaultTable
    *          the table to use when the tablename is null in the write call
+   * @throws IOException
+   *           Thrown when there are issues creating the file distributed with the users
info
    */
-  public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean
createTables, String defaultTable) {
+  public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean
createTables, String defaultTable) throws IOException {
     if (conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
       throw new IllegalStateException("Output info can only be set once per job");
     conf.setBoolean(OUTPUT_INFO_HAS_BEEN_SET, true);
     
     ArgumentChecker.notNull(user, passwd);
     conf.set(USERNAME, user);
-    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
     conf.setBoolean(CREATETABLES, createTables);
     if (defaultTable != null)
       conf.set(DEFAULT_TABLE_NAME, defaultTable);
+    
+    FileSystem fs = FileSystem.get(conf);
+    Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis()
+ ".pw");
+    conf.set(PASSWORD_PATH, file.toString());
+    FSDataOutputStream fos = fs.create(file, false);
+    fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+    fs.deleteOnExit(file);
+    
+    byte[] encodedPw = Base64.encodeBase64(passwd);
+    fos.writeInt(encodedPw.length);
+    fos.write(encodedPw);
+    fos.close();
+    
+    DistributedCache.addCacheFile(file.toUri(), conf);
+
   }
   
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String
zooKeepers) {
@@ -157,11 +180,19 @@ public class AccumuloOutputFormat extend
   }
   
   /**
-   * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks;
It is BASE64 encoded to provide a charset safe conversion to a
-   * string, and is not intended to be secure.
+   * @throws IOException
    */
-  protected static byte[] getPassword(Configuration conf) {
-    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
+  protected static byte[] getPassword(Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    Path file = new Path(conf.get(PASSWORD_PATH));
+    
+    FSDataInputStream fdis = fs.open(file);
+    int length = fdis.readInt();
+    byte[] encodedPassword = new byte[length];
+    fdis.read(encodedPassword);
+    fdis.close();
+    
+    return Base64.decodeBase64(encodedPassword);
   }
   
   protected static boolean canCreateTables(Configuration conf) {
@@ -213,7 +244,7 @@ public class AccumuloOutputFormat extend
     
     private Connector conn;
     
-    protected AccumuloRecordWriter(Configuration conf) throws AccumuloException, AccumuloSecurityException
{
+    protected AccumuloRecordWriter(Configuration conf) throws AccumuloException, AccumuloSecurityException,
IOException {
       Level l = getLogLevel(conf);
       if (l != null)
         log.setLevel(getLogLevel(conf));

Propchange: accumulo/trunk/server/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/server:r1339220-1339223
  Merged /accumulo/branches/1.4/src/server:r1336821-1339223

Propchange: accumulo/trunk/src/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src:r1339220-1339223
  Merged /accumulo/branches/1.4/src/src:r1336821-1339223



Mime
View raw message