accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bil...@apache.org
Subject svn commit: r1329342 - in /accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce: AccumuloOutputFormat.java InputFormatBase.java
Date Mon, 23 Apr 2012 17:22:36 GMT
Author: billie
Date: Mon Apr 23 17:22:35 2012
New Revision: 1329342

URL: http://svn.apache.org/viewvc?rev=1329342&view=rev
Log:
ACCUMULO-532 made input and output formats more extendable

Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1329342&r1=1329341&r2=1329342&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
Mon Apr 23 17:22:35 2012
@@ -142,7 +142,7 @@ public class AccumuloOutputFormat extend
   public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) {
     conf.setInt(NUM_WRITE_THREADS, numberOfThreads);
   }
-
+  
   public static void setLogLevel(Configuration conf, Level level) {
     ArgumentChecker.notNull(level);
     conf.setInt(LOGLEVEL, level.toInt());
@@ -200,7 +200,7 @@ public class AccumuloOutputFormat extend
     return conf.getBoolean(SIMULATE, false);
   }
   
-  private static class AccumuloRecordWriter extends RecordWriter<Text,Mutation> {
+  protected static class AccumuloRecordWriter extends RecordWriter<Text,Mutation> {
     private MultiTableBatchWriter mtbw = null;
     private HashMap<Text,BatchWriter> bws = null;
     private Text defaultTableName = null;
@@ -213,25 +213,24 @@ public class AccumuloOutputFormat extend
     
     private Connector conn;
     
-    AccumuloRecordWriter(TaskAttemptContext attempt) throws AccumuloException, AccumuloSecurityException
{
-      Level l = getLogLevel(attempt.getConfiguration());
+    protected AccumuloRecordWriter(Configuration conf) throws AccumuloException, AccumuloSecurityException
{
+      Level l = getLogLevel(conf);
       if (l != null)
-        log.setLevel(getLogLevel(attempt.getConfiguration()));
-      this.simulate = getSimulationMode(attempt.getConfiguration());
-      this.createTables = canCreateTables(attempt.getConfiguration());
+        log.setLevel(getLogLevel(conf));
+      this.simulate = getSimulationMode(conf);
+      this.createTables = canCreateTables(conf);
       
       if (simulate)
         log.info("Simulating output only. No writes to tables will occur");
       
       this.bws = new HashMap<Text,BatchWriter>();
       
-      String tname = getDefaultTableName(attempt.getConfiguration());
+      String tname = getDefaultTableName(conf);
       this.defaultTableName = (tname == null) ? null : new Text(tname);
       
       if (!simulate) {
-        this.conn = getInstance(attempt.getConfiguration()).getConnector(getUsername(attempt.getConfiguration()),
getPassword(attempt.getConfiguration()));
-        mtbw = conn.createMultiTableBatchWriter(getMaxMutationBufferSize(attempt.getConfiguration()),
getMaxLatency(attempt.getConfiguration()),
-            getMaxWriteThreads(attempt.getConfiguration()));
+        this.conn = getInstance(conf).getConnector(getUsername(conf), getPassword(conf));
+        mtbw = conn.createMultiTableBatchWriter(getMaxMutationBufferSize(conf), getMaxLatency(conf),
getMaxWriteThreads(conf));
       }
     }
     
@@ -355,14 +354,17 @@ public class AccumuloOutputFormat extend
   
   @Override
   public void checkOutputSpecs(JobContext job) throws IOException {
-    Configuration conf = job.getConfiguration();
+    checkOutputSpecs(job.getConfiguration());
+  }
+  
+  public void checkOutputSpecs(Configuration conf) throws IOException {
     if (!conf.getBoolean(OUTPUT_INFO_HAS_BEEN_SET, false))
       throw new IOException("Output info has not been set.");
     if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false))
       throw new IOException("Instance info has not been set.");
     try {
-      Connector c = getInstance(job.getConfiguration()).getConnector(getUsername(job.getConfiguration()),
getPassword(job.getConfiguration()));
-      if (!c.securityOperations().authenticateUser(getUsername(job.getConfiguration()), getPassword(job.getConfiguration())))
+      Connector c = getInstance(conf).getConnector(getUsername(conf), getPassword(conf));
+      if (!c.securityOperations().authenticateUser(getUsername(conf), getPassword(conf)))
         throw new IOException("Unable to authenticate user");
     } catch (AccumuloException e) {
       throw new IOException(e);
@@ -379,7 +381,7 @@ public class AccumuloOutputFormat extend
   @Override
   public RecordWriter<Text,Mutation> getRecordWriter(TaskAttemptContext attempt) throws
IOException {
     try {
-      return new AccumuloRecordWriter(attempt);
+      return new AccumuloRecordWriter(attempt.getConfiguration());
     } catch (Exception e) {
       throw new IOException(e);
     }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1329342&r1=1329341&r2=1329342&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
Mon Apr 23 17:22:35 2012
@@ -139,7 +139,7 @@ public abstract class InputFormatBase<K,
   private static final String ITERATORS_DELIM = ",";
   
   private static final String READ_OFFLINE = PREFIX + ".read.offline";
-
+  
   /**
    * Enable or disable use of the {@link IsolatedScanner} in this configuration object. By
default it is not enabled.
    * 
@@ -610,9 +610,9 @@ public abstract class InputFormatBase<K,
   protected static boolean isOfflineScan(Configuration conf) {
     return conf.getBoolean(READ_OFFLINE, false);
   }
-
+  
   // Return a list of the iterator settings (for iterators to apply to a scanner)
-
+  
   /**
    * Gets a list of the iterator settings (for iterators to apply to a scanner) from this
configuration.
    * 
@@ -716,41 +716,45 @@ public abstract class InputFormatBase<K,
      * Initialize a scanner over the given input split using this task attempt configuration.
      */
     public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException
{
+      initialize(inSplit, attempt.getConfiguration());
+    }
+    
+    public void initialize(InputSplit inSplit, Configuration conf) throws IOException {
       Scanner scanner;
       split = (RangeInputSplit) inSplit;
       log.debug("Initializing input split: " + split.range);
-      Instance instance = getInstance(attempt.getConfiguration());
-      String user = getUsername(attempt.getConfiguration());
-      byte[] password = getPassword(attempt.getConfiguration());
-      Authorizations authorizations = getAuthorizations(attempt.getConfiguration());
+      Instance instance = getInstance(conf);
+      String user = getUsername(conf);
+      byte[] password = getPassword(conf);
+      Authorizations authorizations = getAuthorizations(conf);
       
       try {
         log.debug("Creating connector with user: " + user);
         Connector conn = instance.getConnector(user, password);
-        log.debug("Creating scanner for table: " + getTablename(attempt.getConfiguration()));
+        log.debug("Creating scanner for table: " + getTablename(conf));
         log.debug("Authorizations are: " + authorizations);
-        if (isOfflineScan(attempt.getConfiguration())) {
+        if (isOfflineScan(conf)) {
           scanner = new OfflineScanner(instance, new AuthInfo(user, ByteBuffer.wrap(password),
instance.getInstanceID()), Tables.getTableId(instance,
-              getTablename(attempt.getConfiguration())), authorizations);
+              getTablename(conf)), authorizations);
         } else {
-          scanner = conn.createScanner(getTablename(attempt.getConfiguration()), authorizations);
+          scanner = conn.createScanner(getTablename(conf), authorizations);
         }
-        if (isIsolated(attempt.getConfiguration())) {
+        if (isIsolated(conf)) {
           log.info("Creating isolated scanner");
           scanner = new IsolatedScanner(scanner);
         }
-        if (usesLocalIterators(attempt.getConfiguration())) {
+        if (usesLocalIterators(conf)) {
           log.info("Using local iterators");
           scanner = new ClientSideIteratorScanner(scanner);
         }
-        setupMaxVersions(attempt.getConfiguration(), scanner);
-        setupIterators(attempt.getConfiguration(), scanner);
+        setupMaxVersions(conf, scanner);
+        setupIterators(conf, scanner);
       } catch (Exception e) {
         throw new IOException(e);
       }
       
       // setup a scanner within the bounds of this split
-      for (Pair<Text,Text> c : getFetchedColumns(attempt.getConfiguration())) {
+      for (Pair<Text,Text> c : getFetchedColumns(conf)) {
         if (c.getSecond() != null) {
           log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
           scanner.fetchColumn(c.getFirst(), c.getSecond());
@@ -792,13 +796,13 @@ public abstract class InputFormatBase<K,
     }
   }
   
-  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(JobContext job,
String tableName, List<Range> ranges) throws TableNotFoundException,
+  Map<String,Map<KeyExtent,List<Range>>> binOfflineTable(Configuration
conf, String tableName, List<Range> ranges) throws TableNotFoundException,
       AccumuloException, AccumuloSecurityException {
     
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-
-    Instance instance = getInstance(job.getConfiguration());
-    Connector conn = instance.getConnector(getUsername(job.getConfiguration()), getPassword(job.getConfiguration()));
+    
+    Instance instance = getInstance(conf);
+    Connector conn = instance.getConnector(getUsername(conf), getPassword(conf));
     String tableId = Tables.getTableId(instance, tableName);
     
     if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
@@ -807,7 +811,7 @@ public abstract class InputFormatBase<K,
         throw new AccumuloException("Table is online " + tableName + "(" + tableId + ") cannot
scan table in offline mode ");
       }
     }
-
+    
     for (Range range : ranges) {
       Text startRow;
       
@@ -827,9 +831,9 @@ public abstract class InputFormatBase<K,
       RowIterator rowIter = new RowIterator(scanner);
       
       // TODO check that extents match prev extent
-
+      
       KeyExtent lastExtent = null;
-
+      
       while (rowIter.hasNext()) {
         Iterator<Entry<Key,Value>> row = rowIter.next();
         String last = "";
@@ -848,7 +852,7 @@ public abstract class InputFormatBase<K,
               || key.getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY))
{
             location = entry.getValue().toString();
           }
-
+          
           if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
             extent = new KeyExtent(key.getRow(), entry.getValue());
           }
@@ -857,15 +861,15 @@ public abstract class InputFormatBase<K,
         
         if (location != null)
           return null;
-
+        
         if (!extent.getTableId().toString().equals(tableId)) {
           throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent);
         }
-
+        
         if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) {
           throw new AccumuloException(" " + lastExtent + " is not previous extent " + extent);
         }
-
+        
         Map<KeyExtent,List<Range>> tabletRanges = binnedRanges.get(last);
         if (tabletRanges == null) {
           tabletRanges = new HashMap<KeyExtent,List<Range>>();
@@ -879,29 +883,33 @@ public abstract class InputFormatBase<K,
         }
         
         rangeList.add(range);
-
+        
         if (extent.getEndRow() == null || range.afterEndKey(new Key(extent.getEndRow()).followingKey(PartialKey.ROW)))
{
           break;
         }
         
         lastExtent = extent;
       }
-
+      
     }
     
     return binnedRanges;
   }
-
+  
   /**
    * Read the metadata table to get tablets and match up ranges to them.
    */
   public List<InputSplit> getSplits(JobContext job) throws IOException {
-    log.setLevel(getLogLevel(job.getConfiguration()));
-    validateOptions(job.getConfiguration());
+    return getSplits(job.getConfiguration());
+  }
+  
+  public List<InputSplit> getSplits(Configuration conf) throws IOException {
+    log.setLevel(getLogLevel(conf));
+    validateOptions(conf);
     
-    String tableName = getTablename(job.getConfiguration());
-    boolean autoAdjust = getAutoAdjustRanges(job.getConfiguration());
-    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(job.getConfiguration()))
: getRanges(job.getConfiguration());
+    String tableName = getTablename(conf);
+    boolean autoAdjust = getAutoAdjustRanges(conf);
+    List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(conf)) : getRanges(conf);
     
     if (ranges.isEmpty()) {
       ranges = new ArrayList<Range>(1);
@@ -912,17 +920,17 @@ public abstract class InputFormatBase<K,
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     TabletLocator tl;
     try {
-      if (isOfflineScan(job.getConfiguration())) {
-        binnedRanges = binOfflineTable(job, tableName, ranges);
+      if (isOfflineScan(conf)) {
+        binnedRanges = binOfflineTable(conf, tableName, ranges);
         while (binnedRanges == null) {
           // Some tablets were still online, try again
           UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep randomly between
100 and 200 ms
-          binnedRanges = binOfflineTable(job, tableName, ranges);
+          binnedRanges = binOfflineTable(conf, tableName, ranges);
         }
       } else {
-        Instance instance = getInstance(job.getConfiguration());
+        Instance instance = getInstance(conf);
         String tableId = null;
-        tl = getTabletLocator(job.getConfiguration());
+        tl = getTabletLocator(conf);
         while (!tl.binRanges(ranges, binnedRanges).isEmpty()) {
           if (!(instance instanceof MockInstance)) {
             if (tableId == null)
@@ -994,10 +1002,19 @@ public abstract class InputFormatBase<K,
       locations = new String[0];
     }
     
+    public RangeInputSplit(RangeInputSplit split) throws IOException {
+      this.setRange(split.getRange());
+      this.setLocations(split.getLocations());
+    }
+    
     public Range getRange() {
       return range;
     }
     
+    public void setRange(Range range) {
+      this.range = range;
+    }
+    
     private static byte[] extractBytes(ByteSequence seq, int numBytes) {
       byte[] bytes = new byte[numBytes + 1];
       bytes[0] = 0;
@@ -1068,6 +1085,10 @@ public abstract class InputFormatBase<K,
       return locations;
     }
     
+    public void setLocations(String[] locations) {
+      this.locations = locations;
+    }
+    
     public void readFields(DataInput in) throws IOException {
       range.readFields(in);
       int numLocs = in.readInt();



Mime
View raw message