accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1401004 - in /accumulo/trunk: ./ assemble/ core/ core/src/main/java/org/apache/accumulo/core/client/mapreduce/ core/src/test/java/org/apache/accumulo/core/client/mapreduce/ fate/src/main/java/org/apache/accumulo/fate/ fate/src/main/java/or...
Date Mon, 22 Oct 2012 18:59:59 GMT
Author: kturner
Date: Mon Oct 22 18:59:59 2012
New Revision: 1401004

URL: http://svn.apache.org/viewvc?rev=1401004&view=rev
Log:
ACCUMULO-826 ACCUMULO-507 reverted changes that caused map reduce jobs to fail if the process
that started the job exited

Modified:
    accumulo/trunk/   (props changed)
    accumulo/trunk/assemble/   (props changed)
    accumulo/trunk/core/   (props changed)
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java   (props changed)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java 
 (props changed)
    accumulo/trunk/server/   (props changed)
    accumulo/trunk/src/   (props changed)

Propchange: accumulo/trunk/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src:r1400976
  Merged /accumulo/branches/1.4:r1400976

Propchange: accumulo/trunk/assemble/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src/assemble:r1400976
  Merged /accumulo/branches/1.4/assemble:r1400976

Propchange: accumulo/trunk/core/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/core:r1400976
  Merged /accumulo/branches/1.4/src/core:r1400976

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1401004&r1=1401003&r2=1401004&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
Mon Oct 22 18:59:59 2012
@@ -40,13 +40,6 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -77,7 +70,7 @@ public class AccumuloOutputFormat extend
   private static final String OUTPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
   private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
   private static final String USERNAME = PREFIX + ".username";
-  private static final String PASSWORD_PATH = PREFIX + ".password";
+  private static final String PASSWORD = PREFIX + ".password";
   private static final String DEFAULT_TABLE_NAME = PREFIX + ".defaulttable";
   
   private static final String INSTANCE_NAME = PREFIX + ".instanceName";
@@ -119,28 +112,10 @@ public class AccumuloOutputFormat extend
     
     ArgumentChecker.notNull(user, passwd);
     conf.set(USERNAME, user);
+    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
     conf.setBoolean(CREATETABLES, createTables);
     if (defaultTable != null)
       conf.set(DEFAULT_TABLE_NAME, defaultTable);
-    
-    try {
-      FileSystem fs = FileSystem.get(conf);
-      Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis()
+ ".pw");
-      conf.set(PASSWORD_PATH, file.toString());
-      FSDataOutputStream fos = fs.create(file, false);
-      fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
-      fs.deleteOnExit(file);
-      
-      byte[] encodedPw = Base64.encodeBase64(passwd);
-      fos.writeInt(encodedPw.length);
-      fos.write(encodedPw);
-      fos.close();
-      
-      DistributedCache.addCacheFile(file.toUri(), conf);
-    } catch (IOException ioe) {
-      throw new RuntimeException(ioe);
-    }
-
   }
   
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String
zooKeepers) {
@@ -205,19 +180,11 @@ public class AccumuloOutputFormat extend
   }
   
   /**
-   * @throws IOException
+   * WARNING: The password is stored in the Configuration and shared with all MapReduce tasks;
It is BASE64 encoded to provide a charset safe conversion to a
+   * string, and is not intended to be secure.
    */
-  protected static byte[] getPassword(Configuration conf) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    Path file = new Path(conf.get(PASSWORD_PATH));
-    
-    FSDataInputStream fdis = fs.open(file);
-    int length = fdis.readInt();
-    byte[] encodedPassword = new byte[length];
-    fdis.read(encodedPassword);
-    fdis.close();
-    
-    return Base64.decodeBase64(encodedPassword);
+  protected static byte[] getPassword(Configuration conf) {
+    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
   }
   
   protected static boolean canCreateTables(Configuration conf) {

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1401004&r1=1401003&r2=1401004&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
Mon Oct 22 18:59:59 2012
@@ -74,13 +74,6 @@ import org.apache.accumulo.core.util.Tex
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -115,7 +108,7 @@ public abstract class InputFormatBase<K,
   private static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured";
   private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured";
   private static final String USERNAME = PREFIX + ".username";
-  private static final String PASSWORD_PATH = PREFIX + ".password";
+  private static final String PASSWORD = PREFIX + ".password";
   private static final String TABLE_NAME = PREFIX + ".tablename";
   private static final String AUTHORIZATIONS = PREFIX + ".authorizations";
   
@@ -187,28 +180,10 @@ public abstract class InputFormatBase<K,
     
     ArgumentChecker.notNull(user, passwd, table);
     conf.set(USERNAME, user);
+    conf.set(PASSWORD, new String(Base64.encodeBase64(passwd)));
     conf.set(TABLE_NAME, table);
     if (auths != null && !auths.isEmpty())
       conf.set(AUTHORIZATIONS, auths.serialize());
-    
-    try {
-      FileSystem fs = FileSystem.get(conf);
-      Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis()
+ ".pw");
-      conf.set(PASSWORD_PATH, file.toString());
-      FSDataOutputStream fos = fs.create(file, false);
-      fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
-      fs.deleteOnExit(file);
-      
-      byte[] encodedPw = Base64.encodeBase64(passwd);
-      fos.writeInt(encodedPw.length);
-      fos.write(encodedPw);
-      fos.close();
-      
-      DistributedCache.addCacheFile(file.toUri(), conf);
-    } catch (IOException ioe) {
-      throw new RuntimeException(ioe);
-    }
-    
   }
   
   /**
@@ -255,24 +230,17 @@ public abstract class InputFormatBase<K,
    */
   public static void setRanges(Configuration conf, Collection<Range> ranges) {
     ArgumentChecker.notNull(ranges);
+    ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size());
     try {
-      FileSystem fs = FileSystem.get(conf);
-      Path file = new Path(fs.getWorkingDirectory(), conf.get("mapred.job.name") + System.currentTimeMillis()
+ ".ranges");
-      conf.set(RANGES, file.toString());
-      FSDataOutputStream fos = fs.create(file, false);
-      fs.setPermission(file, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
-      fs.deleteOnExit(file);
-      
-      fos.writeInt(ranges.size());
       for (Range r : ranges) {
-        r.write(fos);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        r.write(new DataOutputStream(baos));
+        rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray())));
       }
-      fos.close();
-      
-      DistributedCache.addCacheFile(file.toUri(), conf);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to write ranges to file", e);
+    } catch (IOException ex) {
+      throw new IllegalArgumentException("Unable to encode ranges to Base64", ex);
     }
+    conf.setStrings(RANGES, rangeStrings.toArray(new String[0]));
   }
   
   /**
@@ -441,26 +409,18 @@ public abstract class InputFormatBase<K,
     return conf.get(USERNAME);
   }
   
+  
   /**
-   * Gets the password from the configuration.
+   * Gets the password from the configuration. WARNING: The password is stored in the Configuration
and shared with all MapReduce tasks; It is BASE64 encoded to
+   * provide a charset safe conversion to a string, and is not intended to be secure.
    * 
    * @param conf
    *          the Hadoop configuration object
    * @return the BASE64-encoded password
-   * @throws IOException
    * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
    */
-  protected static byte[] getPassword(Configuration conf) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    Path file = new Path(conf.get(PASSWORD_PATH));
-    
-    FSDataInputStream fdis = fs.open(file);
-    int length = fdis.readInt();
-    byte[] encodedPassword = new byte[length];
-    fdis.read(encodedPassword);
-    fdis.close();
-    
-    return Base64.decodeBase64(encodedPassword);
+  protected static byte[] getPassword(Configuration conf) {
+    return Base64.decodeBase64(conf.get(PASSWORD, "").getBytes());
   }
   
   /**
@@ -511,10 +471,8 @@ public abstract class InputFormatBase<K,
    * @return an accumulo tablet locator
    * @throws TableNotFoundException
    *           if the table name set on the configuration doesn't exist
-   * @throws IOException
-   *           if the input format is unable to read the password file from the FileSystem
    */
-  protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException,
IOException {
+  protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException
{
     if (conf.getBoolean(MOCK, false))
       return new MockTabletLocator();
     Instance instance = getInstance(conf);
@@ -537,21 +495,12 @@ public abstract class InputFormatBase<K,
    */
   protected static List<Range> getRanges(Configuration conf) throws IOException {
     ArrayList<Range> ranges = new ArrayList<Range>();
-    FileSystem fs = FileSystem.get(conf);
-    String rangePath = conf.get(RANGES);
-    if (rangePath == null)
-      return ranges;
-    Path file = new Path(rangePath);
-    
-    FSDataInputStream fdis = fs.open(file);
-    int numRanges = fdis.readInt();
-    while (numRanges > 0) {
-      Range r = new Range();
-      r.readFields(fdis);
-      ranges.add(r);
-      numRanges--;
+    for (String rangeString : conf.getStringCollection(RANGES)) {
+      ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes()));
+      Range range = new Range();
+      range.readFields(new DataInputStream(bais));
+      ranges.add(range);
     }
-    fdis.close();
     return ranges;
   }
   
@@ -809,7 +758,7 @@ public abstract class InputFormatBase<K,
   }
   
   Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(Configuration
conf, String tableName, List<Range> ranges) throws TableNotFoundException,
-      AccumuloException, AccumuloSecurityException, IOException {
+      AccumuloException, AccumuloSecurityException {
     
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java?rev=1401004&r1=1401003&r2=1401004&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
(original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
Mon Oct 22 18:59:59 2012
@@ -22,8 +22,6 @@ import static org.junit.Assert.assertTru
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -34,7 +32,6 @@ import org.apache.accumulo.core.client.m
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.RegExFilter;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
@@ -108,24 +105,6 @@ public class AccumuloInputFormatTest {
     String iterators = conf.get("AccumuloInputFormat.iterators");
     assertEquals(new String(Base64.encodeBase64(baos.toByteArray())), iterators);
   }
-
-  static abstract class GetRanges<K, V> extends InputFormatBase<K,V> {
-    public static List<Range> getRanges(Configuration conf) throws IOException {
-      return InputFormatBase.getRanges(conf);
-    }
-  };
-
-  @Test
-  public void testSetRanges() throws IOException {
-    JobContext job = ContextFactory.createJobContext();
-    List<Range> ranges = new ArrayList<Range>();
-    for (int i = 0; i < 100000; i++) {
-      ranges.add(new Range(new Text(String.format("%05x", i))));
-    }
-    AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
-    List<Range> ranges2 = GetRanges.getRanges(job.getConfiguration());
-    assertEquals(ranges, ranges2);
-  }
   
   @Test
   public void testAddIterator() {
@@ -247,17 +226,14 @@ public class AccumuloInputFormatTest {
   
   static class TestMapper extends Mapper<Key,Value,Key,Value> {
     Key key = null;
-    int first = 0;
     int count = 0;
     
     @Override
     protected void map(Key k, Value v, Context context) throws IOException, InterruptedException
{
       if (key != null)
         assertEquals(key.getRow().toString(), new String(v.get()));
-      else
-        first = Integer.parseInt(k.getRow().toString(), 16) - 1;
-      assertEquals(k.getRow(), new Text(String.format("%09x", first + count + 1)));
-      assertEquals(new String(v.get()), String.format("%09x", first + count));
+      assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
+      assertEquals(new String(v.get()), String.format("%09x", count));
       key = new Key(k);
       count++;
     }
@@ -282,14 +258,10 @@ public class AccumuloInputFormatTest {
     job.setNumReduceTasks(0);
     AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable",
new Authorizations());
     AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
-    HashSet<Range> ranges = new HashSet<Range>();
-    ranges.add(new Range("000000000", "000000010"));
-    ranges.add(new Range("000000100", "000000110"));
-    AccumuloInputFormat.setRanges(job.getConfiguration(), ranges);
     
     AccumuloInputFormat input = new AccumuloInputFormat();
     List<InputSplit> splits = input.getSplits(job);
-    assertEquals(splits.size(), 2);
+    assertEquals(splits.size(), 1);
     
     TestMapper mapper = (TestMapper) job.getMapperClass().newInstance();
     for (InputSplit split : splits) {
@@ -298,7 +270,6 @@ public class AccumuloInputFormatTest {
       Mapper<Key,Value,Key,Value>.Context context = ContextFactory.createMapContext(mapper,
tac, reader, null, split);
       reader.initialize(split, context);
       mapper.run(context);
-      assertEquals(mapper.count, 16);
     }
   }
   
@@ -319,9 +290,7 @@ public class AccumuloInputFormatTest {
     AccumuloInputFormat.setInputInfo(job.getConfiguration(), "root", "".getBytes(), "testtable2",
new Authorizations());
     AccumuloInputFormat.setMockInstance(job.getConfiguration(), "testmapinstance");
     AccumuloInputFormat input = new AccumuloInputFormat();
-    List<InputSplit> splits = input.getSplits(job);
-    assertEquals(splits.size(), 1);
-    RangeInputSplit ris = (RangeInputSplit) splits.get(0);
+    RangeInputSplit ris = new RangeInputSplit();
     TaskAttemptContext tac = ContextFactory.createTaskAttemptContext(job);
     RecordReader<Key,Value> rr = input.createRecordReader(ris, tac);
     rr.initialize(ris, tac);
@@ -332,6 +301,5 @@ public class AccumuloInputFormatTest {
     while (rr.nextKeyValue()) {
       mapper.map(rr.getCurrentKey(), rr.getCurrentValue(), (TestMapper.Context) context);
     }
-    assertEquals(mapper.count, 100);
   }
 }

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1400976
  Merged /accumulo/branches/1.4/src/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1400976

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1400976
  Merged /accumulo/branches/1.4/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1400976

Propchange: accumulo/trunk/server/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/server:r1400976
  Merged /accumulo/branches/1.4/src/server:r1400976

Propchange: accumulo/trunk/src/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src:r1400976
  Merged /accumulo/branches/1.4/src/src:r1400976



Mime
View raw message