accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [5/6] git commit: ACCUMULO-1854 Lift duplicated code between AIF and AOF into a helper class
Date Thu, 07 Nov 2013 05:24:26 GMT
ACCUMULO-1854 Lift duplicated code between AIF and AOF into a helper
class


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0f10a6ff
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0f10a6ff
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0f10a6ff

Branch: refs/heads/ACCUMULO-1854-multi-aif
Commit: 0f10a6ffb0400424d30d3f49312bb500265cb276
Parents: 1fe2238
Author: Josh Elser <elserj@apache.org>
Authored: Wed Nov 6 17:51:06 2013 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Wed Nov 6 17:51:06 2013 -0500

----------------------------------------------------------------------
 .../client/mapreduce/AccumuloOutputFormat.java  | 130 +++----------
 .../core/client/mapreduce/InputFormatBase.java  | 184 +++++--------------
 .../client/mapreduce/SequencedFormatHelper.java | 145 +++++++++++++++
 .../mapreduce/AccumuloInputFormatTest.java      |   7 +-
 4 files changed, 214 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index 5e5e43d..dd9762e 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -96,14 +96,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   private static final int DEFAULT_MAX_LATENCY = 60 * 1000; // 1 minute
   private static final int DEFAULT_NUM_WRITE_THREADS = 2;
   
-  private static final int DEFAULT_SEQUENCE = 0;
   private static final String SEQ_DELIM = ".";
   
-  private static final String COMMA = ",";
   private static final String CONFIGURED_SEQUENCES = PREFIX + ".configuredSeqs";
-  private static final String DEFAULT_SEQ_USED = PREFIX + ".defaultSequenceUsed";
-  private static final String PROCESSED_SEQUENCES = PREFIX + ".processedSeqs";
-  private static final String TRUE = "true";
 
 
 
@@ -113,17 +108,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    * @return A unique number to provide to future AccumuloInputFormat calls
    */
   public static synchronized int nextSequence(Configuration conf) {
-    String value = conf.get(CONFIGURED_SEQUENCES);
-    if (null == value) {
-      conf.set(CONFIGURED_SEQUENCES, "1");
-      return 1;
-    } else {
-      String[] splitValues = StringUtils.split(value, COMMA);
-      int newValue = Integer.parseInt(splitValues[splitValues.length-1]) + 1;
-      
-      conf.set(CONFIGURED_SEQUENCES, value + COMMA + newValue);
-      return newValue;
-    }
+    return SequencedFormatHelper.nextSequence(conf, PREFIX);
   }
   
   /**
@@ -133,84 +118,11 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    * @throws NoSuchElementException
    */
   protected static synchronized int nextSequenceToProcess(Configuration conf) throws NoSuchElementException
{
-    String[] processedConfs = conf.getStrings(PROCESSED_SEQUENCES);
-    
-    // We haven't set anything, so we need to find the first to return
-    if (null == processedConfs || 0 == processedConfs.length) {
-      // Check to see if the default sequence was used
-      boolean defaultSeqUsed = conf.getBoolean(DEFAULT_SEQ_USED, false);
-      
-      // If so, set that we're processing it and return the value of the default
-      if (defaultSeqUsed) {
-        conf.set(PROCESSED_SEQUENCES, Integer.toString(DEFAULT_SEQUENCE));
-        return DEFAULT_SEQUENCE;
-      }
-      
-      String[] loadedConfs = conf.getStrings(CONFIGURED_SEQUENCES);
-      
-      // There was *nothing* loaded, fail.
-      if (null == loadedConfs || 0 == loadedConfs.length) {
-        throw new NoSuchElementException("Sequence was requested to process but none exist
to return");
-      }
-      
-      // We have loaded configuration(s), use the first
-      int firstLoaded = Integer.parseInt(loadedConfs[0]);
-      conf.setInt(PROCESSED_SEQUENCES, firstLoaded);
-      
-      return firstLoaded;
-    }
-    
-    // We've previously parsed some confs, need to find the next one to load
-    int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]);
-    String[] configuredSequencesArray = conf.getStrings(CONFIGURED_SEQUENCES);
-    
-    // We only have the default sequence, no specifics.
-    // Getting here, we already know that we processed that default
-    if (null == configuredSequencesArray) {
-      return -1;
-    }
-
-    List<Integer> configuredSequences = new ArrayList<Integer>(configuredSequencesArray.length
+ 1);
-    
-    // If we used the default sequence ID, add that into the list of configured sequences
-    if (conf.getBoolean(DEFAULT_SEQ_USED, false)) {
-      configuredSequences.add(DEFAULT_SEQUENCE);
-    }
-
-    // Add the rest of any sequences to our list
-    for (String configuredSequence : configuredSequencesArray) {
-      configuredSequences.add(Integer.parseInt(configuredSequence));
-    }
-    
-    int lastParsedSeqIndex = configuredSequences.size() - 1;
-    
-    // Find the next sequence number after the one we last processed
-    for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) {
-      int lastLoadedValue = configuredSequences.get(lastParsedSeqIndex);
-      
-      if (lastLoadedValue == lastProcessedSeq) {
-        break;
-      }
-    }
-    
-    // We either had no sequences to match or we matched the last configured sequence
-    // Both of which are equivalent to no (more) sequences to process
-    if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequences.size())
{
-      return -1;
-    }
-    
-    // Get the value of the sequence at that offset
-    int nextSequence = configuredSequences.get(lastParsedSeqIndex + 1);
-    conf.set(PROCESSED_SEQUENCES, conf.get(PROCESSED_SEQUENCES) + COMMA + nextSequence);
-    
-    return nextSequence;
+    return SequencedFormatHelper.nextSequenceToProcess(conf, PREFIX);
   }
   
   protected static void setDefaultSequenceUsed(Configuration conf) {
-    String value = conf.get(DEFAULT_SEQ_USED);
-    if (null == value || !TRUE.equals(value)) {
-      conf.setBoolean(DEFAULT_SEQ_USED, true);
-    }
+    SequencedFormatHelper.setDefaultSequenceUsed(conf, PREFIX);
   }
 
   protected static String merge(String name, Integer sequence) {
@@ -266,7 +178,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    */
   public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean
createTables, String defaultTable) {
     setDefaultSequenceUsed(conf);
-    setOutputInfo(conf, DEFAULT_SEQUENCE, user, passwd, createTables, defaultTable);
+    setOutputInfo(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, user, passwd, createTables,
defaultTable);
   }
   
   /**
@@ -306,7 +218,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String
zooKeepers) {
     setDefaultSequenceUsed(conf);
-    setZooKeeperInstance(conf, DEFAULT_SEQUENCE, instanceName, zooKeepers);
+    setZooKeeperInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName, zooKeepers);
   }
   
   public static void setZooKeeperInstance(Configuration conf, int sequence, String instanceName,
String zooKeepers) {
@@ -329,7 +241,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   
   public static void setMockInstance(Configuration conf, String instanceName) {
     setDefaultSequenceUsed(conf);
-    setMockInstance(conf, DEFAULT_SEQUENCE, instanceName);
+    setMockInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName);
   }
   
   public static void setMockInstance(Configuration conf, int sequence, String instanceName)
{
@@ -347,7 +259,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   
   public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) {
     setDefaultSequenceUsed(conf);
-    setMaxMutationBufferSize(conf, DEFAULT_SEQUENCE, numberOfBytes);
+    setMaxMutationBufferSize(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, numberOfBytes);
   }
   
   public static void setMaxMutationBufferSize(Configuration conf, int sequence, long numberOfBytes)
{
@@ -363,7 +275,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   
   public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) {
     setDefaultSequenceUsed(conf);
-    setMaxLatency(conf, DEFAULT_SEQUENCE, numberOfMilliseconds);
+    setMaxLatency(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, numberOfMilliseconds);
   }
   
   public static void setMaxLatency(Configuration conf, int sequence, int numberOfMilliseconds)
{
@@ -379,7 +291,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   
   public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) {
     setDefaultSequenceUsed(conf);
-    setMaxWriteThreads(conf, DEFAULT_SEQUENCE, numberOfThreads);
+    setMaxWriteThreads(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, numberOfThreads);
   }
   
   public static void setMaxWriteThreads(Configuration conf, int sequence, int numberOfThreads)
{
@@ -395,7 +307,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   
   public static void setLogLevel(Configuration conf, Level level) {
     setDefaultSequenceUsed(conf);
-    setLogLevel(conf, DEFAULT_SEQUENCE, level);
+    setLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, level);
   }
   
   public static void setLogLevel(Configuration conf, int sequence, Level level) {
@@ -412,7 +324,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   
   public static void setSimulationMode(Configuration conf) {
     setDefaultSequenceUsed(conf);
-    setSimulationMode(conf, DEFAULT_SEQUENCE);
+    setSimulationMode(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   public static void setSimulationMode(Configuration conf, int sequence) {
@@ -427,7 +339,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   protected static String getUsername(Configuration conf) {
-    return getUsername(conf, DEFAULT_SEQUENCE);
+    return getUsername(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static String getUsername(Configuration conf, int sequence) {
@@ -444,7 +356,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
     return getPassword(job.getConfiguration());
   }
   protected static byte[] getPassword(Configuration conf) {
-    return getPassword(conf, DEFAULT_SEQUENCE);
+    return getPassword(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   /**
@@ -463,7 +375,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   protected static boolean canCreateTables(Configuration conf) {
-    return canCreateTables(conf, DEFAULT_SEQUENCE);
+    return canCreateTables(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static boolean canCreateTables(Configuration conf, int sequence) {
@@ -478,7 +390,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   protected static String getDefaultTableName(Configuration conf) {
-    return getDefaultTableName(conf, DEFAULT_SEQUENCE);
+    return getDefaultTableName(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static String getDefaultTableName(Configuration conf, int sequence) {
@@ -493,7 +405,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   protected static Instance getInstance(Configuration conf) {
-    return getInstance(conf, DEFAULT_SEQUENCE);
+    return getInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static Instance getInstance(Configuration conf, int sequence) {
@@ -510,7 +422,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   protected static long getMaxMutationBufferSize(Configuration conf) {
-    return getMaxMutationBufferSize(conf, DEFAULT_SEQUENCE);
+    return getMaxMutationBufferSize(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static long getMaxMutationBufferSize(Configuration conf, int sequence) {
@@ -525,7 +437,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   protected static int getMaxLatency(Configuration conf) {
-    return getMaxLatency(conf, DEFAULT_SEQUENCE);
+    return getMaxLatency(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static int getMaxLatency(Configuration conf, int sequence) {
@@ -540,7 +452,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   protected static int getMaxWriteThreads(Configuration conf) {
-    return getMaxWriteThreads(conf, DEFAULT_SEQUENCE);
+    return getMaxWriteThreads(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static int getMaxWriteThreads(Configuration conf, int sequence) {
@@ -555,7 +467,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   protected static Level getLogLevel(Configuration conf) {
-    return getLogLevel(conf, DEFAULT_SEQUENCE);
+    return getLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static Level getLogLevel(Configuration conf, int sequence) {
@@ -573,7 +485,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   protected static boolean getSimulationMode(Configuration conf) {
-    return getSimulationMode(conf, DEFAULT_SEQUENCE);
+    return getSimulationMode(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
   
   protected static boolean getSimulationMode(Configuration conf, int sequence) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index 7042f19..5c87c13 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -151,124 +151,30 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
   private static final String ITERATORS_DELIM = ",";
 
   private static final String SEQ_DELIM = ".";
-  protected static final int DEFAULT_SEQUENCE = 0;
   
-  private static final String COMMA = ",";
-  private static final String CONFIGURED_SEQUENCES = PREFIX + ".configuredsSeqs";
-  private static final String DEFAULT_SEQ_USED = PREFIX + ".defaultSequenceUsed";
-  private static final String PROCESSED_SEQUENCES = PREFIX + ".processedSeqs";
-  private static final String TRUE = "true";
 
   private static final String READ_OFFLINE = PREFIX + ".read.offline";
 
+  protected static String merge(String name, Integer sequence) {
+    return name + SEQ_DELIM + sequence;
+  }
+
+
   /**
    * Get a unique identifier for these configurations
    * 
    * @return A unique number to provide to future AccumuloInputFormat calls
    */
   public static synchronized int nextSequence(Configuration conf) {
-    String value = conf.get(CONFIGURED_SEQUENCES);
-    if (null == value) {
-      conf.set(CONFIGURED_SEQUENCES, "1");
-      return 1;
-    } else {
-      String[] splitValues = StringUtils.split(value, COMMA);
-      int newValue = Integer.parseInt(splitValues[splitValues.length-1]) + 1;
-      
-      conf.set(CONFIGURED_SEQUENCES, value + COMMA + newValue);
-      return newValue;
-    }
+    return SequencedFormatHelper.nextSequence(conf, PREFIX);
   }
   
-  /**
-   * Using the provided Configuration, return the next sequence number to process.
-   * @param conf A Configuration object used to store AccumuloInputFormat information into
-   * @return The next sequence number to process, -1 when finished.
-   * @throws NoSuchElementException
-   */
-  protected static synchronized int nextSequenceToProcess(Configuration conf) throws NoSuchElementException
{
-    String[] processedConfs = conf.getStrings(PROCESSED_SEQUENCES);
-    
-    // We haven't set anything, so we need to find the first to return
-    if (null == processedConfs || 0 == processedConfs.length) {
-      // Check to see if the default sequence was used
-      boolean defaultSeqUsed = conf.getBoolean(DEFAULT_SEQ_USED, false);
-      
-      // If so, set that we're processing it and return the value of the default
-      if (defaultSeqUsed) {
-        conf.set(PROCESSED_SEQUENCES, Integer.toString(DEFAULT_SEQUENCE));
-        return DEFAULT_SEQUENCE;
-      }
-      
-      String[] loadedConfs = conf.getStrings(CONFIGURED_SEQUENCES);
-      
-      // There was *nothing* loaded, fail.
-      if (null == loadedConfs || 0 == loadedConfs.length) {
-        throw new NoSuchElementException("Sequence was requested to process but none exist
to return");
-      }
-      
-      // We have loaded configuration(s), use the first
-      int firstLoaded = Integer.parseInt(loadedConfs[0]);
-      conf.setInt(PROCESSED_SEQUENCES, firstLoaded);
-      
-      return firstLoaded;
-    }
-    
-    // We've previously parsed some confs, need to find the next one to load
-    int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]);
-    String[] configuredSequencesArray = conf.getStrings(CONFIGURED_SEQUENCES);
-    
-    // We only have the default sequence, no specifics.
-    // Getting here, we already know that we processed that default
-    if (null == configuredSequencesArray) {
-      return -1;
-    }
-
-    List<Integer> configuredSequences = new ArrayList<Integer>(configuredSequencesArray.length
+ 1);
-    
-    // If we used the default sequence ID, add that into the list of configured sequences
-    if (conf.getBoolean(DEFAULT_SEQ_USED, false)) {
-      configuredSequences.add(DEFAULT_SEQUENCE);
-    }
-
-    // Add the rest of any sequences to our list
-    for (String configuredSequence : configuredSequencesArray) {
-      configuredSequences.add(Integer.parseInt(configuredSequence));
-    }
-    
-    int lastParsedSeqIndex = configuredSequences.size() - 1;
-    
-    // Find the next sequence number after the one we last processed
-    for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) {
-      int lastLoadedValue = configuredSequences.get(lastParsedSeqIndex);
-      
-      if (lastLoadedValue == lastProcessedSeq) {
-        break;
-      }
-    }
-    
-    // We either had no sequences to match or we matched the last configured sequence
-    // Both of which are equivalent to no (more) sequences to process
-    if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequences.size())
{
-      return -1;
-    }
-    
-    // Get the value of the sequence at that offset
-    int nextSequence = configuredSequences.get(lastParsedSeqIndex + 1);
-    conf.set(PROCESSED_SEQUENCES, conf.get(PROCESSED_SEQUENCES) + COMMA + nextSequence);
-    
-    return nextSequence;
+  protected static int nextSequenceToProcess(Configuration conf) {
+    return SequencedFormatHelper.nextSequenceToProcess(conf, PREFIX);
   }
   
   protected static void setDefaultSequenceUsed(Configuration conf) {
-    String value = conf.get(DEFAULT_SEQ_USED);
-    if (null == value || !TRUE.equals(value)) {
-      conf.setBoolean(DEFAULT_SEQ_USED, true);
-    }
-  }
-
-  protected static String merge(String name, Integer sequence) {
-    return name + SEQ_DELIM + sequence;
+    SequencedFormatHelper.setDefaultSequenceUsed(conf, PREFIX);
   }
   
   public static Map<String,String> getRelevantEntries(Configuration conf) {
@@ -302,7 +208,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   public static void setIsolated(Configuration conf, boolean enable) {
     setDefaultSequenceUsed(conf);
-    setIsolated(conf, DEFAULT_SEQUENCE, enable);
+    setIsolated(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, enable);
   }
 
   /**
@@ -334,7 +240,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   public static void setLocalIterators(Configuration conf, boolean enable) {
     setDefaultSequenceUsed(conf);
-    setLocalIterators(conf, DEFAULT_SEQUENCE, enable);
+    setLocalIterators(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, enable);
   }
 
   /**
@@ -372,7 +278,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   public static void setInputInfo(Configuration conf, String user, byte[] passwd, String
table, Authorizations auths) {
     setDefaultSequenceUsed(conf);
-    setInputInfo(conf, DEFAULT_SEQUENCE, user, passwd, table, auths);
+    setInputInfo(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, user, passwd, table, auths);
   }
 
   /**
@@ -422,7 +328,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String
zooKeepers) {
     setDefaultSequenceUsed(conf);
-    setZooKeeperInstance(conf, DEFAULT_SEQUENCE, instanceName, zooKeepers);
+    setZooKeeperInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName, zooKeepers);
   }
 
   /**
@@ -463,7 +369,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   public static void setMockInstance(Configuration conf, String instanceName) {
     setDefaultSequenceUsed(conf);
-    setMockInstance(conf, DEFAULT_SEQUENCE, instanceName);
+    setMockInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, instanceName);
   }
 
   /**
@@ -497,7 +403,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   public static void setRanges(Configuration conf, Collection<Range> ranges) {
     setDefaultSequenceUsed(conf);
-    setRanges(conf, DEFAULT_SEQUENCE, ranges);
+    setRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, ranges);
   }
 
   /**
@@ -539,7 +445,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   public static void disableAutoAdjustRanges(Configuration conf) {
     setDefaultSequenceUsed(conf);
-    disableAutoAdjustRanges(conf, DEFAULT_SEQUENCE);
+    disableAutoAdjustRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -569,7 +475,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   public static void setRegex(JobContext job, RegexType type, String regex) {
     setDefaultSequenceUsed(job.getConfiguration());
-    setRegex(job, DEFAULT_SEQUENCE, type, regex);
+    setRegex(job, SequencedFormatHelper.DEFAULT_SEQUENCE, type, regex);
   }
 
   /**
@@ -626,7 +532,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   public static void setMaxVersions(Configuration conf, int maxVersions) throws IOException
{
     setDefaultSequenceUsed(conf);
-    setMaxVersions(conf, DEFAULT_SEQUENCE, maxVersions);
+    setMaxVersions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, maxVersions);
   }
 
   /**
@@ -675,7 +581,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
 
   public static void setScanOffline(Configuration conf, boolean scanOff) {
     setDefaultSequenceUsed(conf);
-    setScanOffline(conf, DEFAULT_SEQUENCE, scanOff);
+    setScanOffline(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, scanOff);
   }
 
   /**
@@ -727,7 +633,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   public static void fetchColumns(Configuration conf, Collection<Pair<Text,Text>>
columnFamilyColumnQualifierPairs) {
     setDefaultSequenceUsed(conf);
-    fetchColumns(conf, DEFAULT_SEQUENCE, columnFamilyColumnQualifierPairs);
+    fetchColumns(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, columnFamilyColumnQualifierPairs);
   }
 
   /**
@@ -771,7 +677,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   public static void setLogLevel(Configuration conf, Level level) {
     setDefaultSequenceUsed(conf);
-    setLogLevel(conf, DEFAULT_SEQUENCE, level);
+    setLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, level);
   }
 
   /**
@@ -806,7 +712,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   public static void addIterator(Configuration conf, IteratorSetting cfg) {
     setDefaultSequenceUsed(conf);
-    addIterator(conf, DEFAULT_SEQUENCE, cfg);
+    addIterator(conf, SequencedFormatHelper.DEFAULT_SEQUENCE, cfg);
   }
 
   /**
@@ -866,7 +772,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   public static void setIterator(JobContext job, int priority, String iteratorClass, String
iteratorName) {
     setDefaultSequenceUsed(job.getConfiguration());
-    setIterator(job, DEFAULT_SEQUENCE, priority, iteratorClass, iteratorName);
+    setIterator(job, SequencedFormatHelper.DEFAULT_SEQUENCE, priority, iteratorClass, iteratorName);
   }
 
   /**
@@ -916,7 +822,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   public static void setIteratorOption(JobContext job, String iteratorName, String key, String
value) {
     setDefaultSequenceUsed(job.getConfiguration());
-    setIteratorOption(job, DEFAULT_SEQUENCE, iteratorName, key, value);
+    setIteratorOption(job, SequencedFormatHelper.DEFAULT_SEQUENCE, iteratorName, key, value);
   }
 
   /**
@@ -967,7 +873,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * @see #setIsolated(Configuration, boolean)
    */
   protected static boolean isIsolated(Configuration conf) {
-    return isIsolated(conf, DEFAULT_SEQUENCE);
+    return isIsolated(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -998,7 +904,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * @see #setLocalIterators(Configuration, boolean)
    */
   protected static boolean usesLocalIterators(Configuration conf) {
-    return usesLocalIterators(conf, DEFAULT_SEQUENCE);
+    return usesLocalIterators(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1029,7 +935,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
    */
   protected static String getUsername(Configuration conf) {
-    return getUsername(conf, DEFAULT_SEQUENCE);
+    return getUsername(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1064,7 +970,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
    */
   protected static byte[] getPassword(Configuration conf) {
-    return getPassword(conf, DEFAULT_SEQUENCE);
+    return getPassword(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1096,7 +1002,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
    */
   protected static String getTablename(Configuration conf) {
-    return getTablename(conf, DEFAULT_SEQUENCE);
+    return getTablename(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1127,7 +1033,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * @see #setInputInfo(Configuration, String, byte[], String, Authorizations)
    */
   protected static Authorizations getAuthorizations(Configuration conf) {
-    return getAuthorizations(conf, DEFAULT_SEQUENCE);
+    return getAuthorizations(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1160,7 +1066,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * @see #setMockInstance(Configuration, String)
    */
   protected static Instance getInstance(Configuration conf) {
-    return getInstance(conf, DEFAULT_SEQUENCE);
+    return getInstance(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1195,7 +1101,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    *           if the table name set on the configuration doesn't exist
    */
   protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException
{
-    return getTabletLocator(conf, DEFAULT_SEQUENCE);
+    return getTabletLocator(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1236,7 +1142,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * @see #setRanges(Configuration, Collection)
    */
   protected static List<Range> getRanges(Configuration conf) throws IOException {
-    return getRanges(conf, DEFAULT_SEQUENCE);
+    return getRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1265,7 +1171,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * @see #setRegex(JobContext, RegexType, String)
    */
   protected static String getRegex(JobContext job, RegexType type) {
-    return getRegex(job, DEFAULT_SEQUENCE, type);
+    return getRegex(job, SequencedFormatHelper.DEFAULT_SEQUENCE, type);
   }
 
   /**
@@ -1317,7 +1223,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * @see #fetchColumns(Configuration, Collection)
    */
   protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf)
{
-    return getFetchedColumns(conf, DEFAULT_SEQUENCE);
+    return getFetchedColumns(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1355,7 +1261,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * @see #disableAutoAdjustRanges(Configuration)
    */
   protected static boolean getAutoAdjustRanges(Configuration conf) {
-    return getAutoAdjustRanges(conf, DEFAULT_SEQUENCE);
+    return getAutoAdjustRanges(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1386,7 +1292,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * @see #setLogLevel(Configuration, Level)
    */
   protected static Level getLogLevel(Configuration conf) {
-    return getLogLevel(conf, DEFAULT_SEQUENCE);
+    return getLogLevel(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1421,7 +1327,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    *           if the configuration is improperly configured
    */
   protected static void validateOptions(Configuration conf) throws IOException {
-    validateOptions(conf, DEFAULT_SEQUENCE);
+    validateOptions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   // InputFormat doesn't have the equivalent of OutputFormat's
@@ -1479,7 +1385,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   protected static int getMaxVersions(Configuration conf) {
     setDefaultSequenceUsed(conf);
-    return getMaxVersions(conf, DEFAULT_SEQUENCE);
+    return getMaxVersions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1495,7 +1401,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
   }
 
   protected static boolean isOfflineScan(Configuration conf) {
-    return isOfflineScan(conf, DEFAULT_SEQUENCE);
+    return isOfflineScan(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   protected static boolean isOfflineScan(Configuration conf, int sequence) {
@@ -1520,7 +1426,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * @see #addIterator(Configuration, IteratorSetting)
    */
   protected static List<AccumuloIterator> getIterators(Configuration conf) {
-    return getIterators(conf, DEFAULT_SEQUENCE);
+    return getIterators(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1565,7 +1471,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    * @see #addIterator(Configuration, IteratorSetting)
    */
   protected static List<AccumuloIteratorOption> getIteratorOptions(Configuration conf)
{
-    return getIteratorOptions(conf, DEFAULT_SEQUENCE);
+    return getIteratorOptions(conf, SequencedFormatHelper.DEFAULT_SEQUENCE);
   }
 
   /**
@@ -1635,7 +1541,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
      * @deprecated Use {@link #setupIterators(Configuration,Scanner)} instead
      */
     protected void setupIterators(TaskAttemptContext attempt, Scanner scanner) throws AccumuloException
{
-      setupIterators(attempt.getConfiguration(), DEFAULT_SEQUENCE, scanner);
+      setupIterators(attempt.getConfiguration(), SequencedFormatHelper.DEFAULT_SEQUENCE,
scanner);
     }
 
     /**
@@ -1667,7 +1573,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
      * @deprecated Use {@link #setupMaxVersions(Configuration,Scanner)} instead
      */
     protected void setupMaxVersions(TaskAttemptContext attempt, Scanner scanner) {
-      setupMaxVersions(attempt.getConfiguration(), DEFAULT_SEQUENCE, scanner);
+      setupMaxVersions(attempt.getConfiguration(), SequencedFormatHelper.DEFAULT_SEQUENCE,
scanner);
     }
 
     /**
@@ -2046,7 +1952,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
     }
 
     RangeInputSplit(String table, Range range, String[] locations) {
-      this(table, range, locations, DEFAULT_SEQUENCE);
+      this(table, range, locations, SequencedFormatHelper.DEFAULT_SEQUENCE);
     }
 
     RangeInputSplit(String table, Range range, String[] locations, int sequence) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
new file mode 100644
index 0000000..ff18754
--- /dev/null
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/SequencedFormatHelper.java
@@ -0,0 +1,145 @@
+package org.apache.accumulo.core.client.mapreduce;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Convenience class with methods useful to dealing with multiple configurations of AccumuloInputFormat
and/or AccumuloOutputFormat in the same Configuration
+ * object.
+ */
+public class SequencedFormatHelper {
+
+  private static final String COMMA = ",";
+  private static final String TRUE = "true";
+  protected static final int DEFAULT_SEQUENCE = 0;
+
+  private static final String DEFAULT_SEQ_USED = ".defaultSequenceUsed";
+  private static final String CONFIGURED_SEQUENCES = ".configuredSeqs";
+  private static final String PROCESSED_SEQUENCES = ".processedSeqs";
+
+  /**
+   * Get a unique identifier for these configurations
+   * 
+   * @return A unique number to provide to future AccumuloInputFormat calls
+   */
+  public static synchronized int nextSequence(Configuration conf, String prefix) {
+    ArgumentChecker.notNull(conf, prefix);
+
+    final String configuredSequences = prefix + CONFIGURED_SEQUENCES;
+
+    String value = conf.get(configuredSequences);
+    if (null == value) {
+      conf.set(configuredSequences, "1");
+      return 1;
+    } else {
+      String[] splitValues = StringUtils.split(value, COMMA);
+      int newValue = Integer.parseInt(splitValues[splitValues.length - 1]) + 1;
+
+      conf.set(configuredSequences, value + COMMA + newValue);
+      return newValue;
+    }
+  }
+
+  protected static void setDefaultSequenceUsed(Configuration conf, String prefix) {
+    ArgumentChecker.notNull(conf, prefix);
+
+    final String configuredSequences = prefix + DEFAULT_SEQ_USED;
+
+    String value = conf.get(configuredSequences);
+    if (null == value || !TRUE.equals(value)) {
+      conf.setBoolean(configuredSequences, true);
+    }
+  }
+
+  /**
+   * Using the provided Configuration, return the next sequence number to process.
+   * 
+   * @param conf
+   *          A Configuration object used to store AccumuloInputFormat information into
+   * @return The next sequence number to process, -1 when finished.
+   * @throws NoSuchElementException
+   */
+  protected static synchronized int nextSequenceToProcess(Configuration conf, String prefix)
throws NoSuchElementException {
+    ArgumentChecker.notNull(prefix);
+
+    final String processedSequences = prefix + PROCESSED_SEQUENCES, defaultSequenceUsed =
prefix + DEFAULT_SEQ_USED, configuredSequences = prefix
+        + CONFIGURED_SEQUENCES;
+
+    String[] processedConfs = conf.getStrings(processedSequences);
+
+    // We haven't set anything, so we need to find the first to return
+    if (null == processedConfs || 0 == processedConfs.length) {
+      // Check to see if the default sequence was used
+      boolean defaultSeqUsed = conf.getBoolean(defaultSequenceUsed, false);
+
+      // If so, set that we're processing it and return the value of the default
+      if (defaultSeqUsed) {
+        conf.set(processedSequences, Integer.toString(DEFAULT_SEQUENCE));
+        return DEFAULT_SEQUENCE;
+      }
+
+      String[] loadedConfs = conf.getStrings(configuredSequences);
+
+      // There was *nothing* loaded, fail.
+      if (null == loadedConfs || 0 == loadedConfs.length) {
+        throw new NoSuchElementException("Sequence was requested to process but none exist
to return");
+      }
+
+      // We have loaded configuration(s), use the first
+      int firstLoaded = Integer.parseInt(loadedConfs[0]);
+      conf.setInt(processedSequences, firstLoaded);
+
+      return firstLoaded;
+    }
+
+    // We've previously parsed some confs, need to find the next one to load
+    int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]);
+    String[] configuredSequencesArray = conf.getStrings(configuredSequences);
+
+    // We only have the default sequence, no specifics.
+    // Getting here, we already know that we processed that default
+    if (null == configuredSequencesArray) {
+      return -1;
+    }
+
+    List<Integer> configuredSequencesList = new ArrayList<Integer>(configuredSequencesArray.length
+ 1);
+
+    // If we used the default sequence ID, add that into the list of configured sequences
+    if (conf.getBoolean(defaultSequenceUsed, false)) {
+      configuredSequencesList.add(DEFAULT_SEQUENCE);
+    }
+
+    // Add the rest of any sequences to our list
+    for (String configuredSequence : configuredSequencesArray) {
+      configuredSequencesList.add(Integer.parseInt(configuredSequence));
+    }
+
+    int lastParsedSeqIndex = configuredSequencesList.size() - 1;
+
+    // Find the next sequence number after the one we last processed
+    for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) {
+      int lastLoadedValue = configuredSequencesList.get(lastParsedSeqIndex);
+
+      if (lastLoadedValue == lastProcessedSeq) {
+        break;
+      }
+    }
+
+    // We either had no sequences to match or we matched the last configured sequence
+    // Both of which are equivalent to no (more) sequences to process
+    if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequencesList.size())
{
+      return -1;
+    }
+
+    // Get the value of the sequence at that offset
+    int nextSequence = configuredSequencesList.get(lastParsedSeqIndex + 1);
+    conf.set(processedSequences, conf.get(processedSequences) + COMMA + nextSequence);
+
+    return nextSequence;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f10a6ff/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
index 98f3e7a..a6f5c48 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 public class AccumuloInputFormatTest {
@@ -101,7 +100,7 @@ public class AccumuloInputFormatTest {
     
     AccumuloInputFormat.setIterator(job, 1, "org.apache.accumulo.core.iterators.WholeRowIterator",
"WholeRow");
     Configuration conf = job.getConfiguration();
-    String iterators = conf.get("AccumuloInputFormat.iterators." + InputFormatBase.DEFAULT_SEQUENCE);
+    String iterators = conf.get("AccumuloInputFormat.iterators." + SequencedFormatHelper.DEFAULT_SEQUENCE);
     assertEquals("1:org.apache.accumulo.core.iterators.WholeRowIterator:WholeRow", iterators);
   }
   
@@ -164,7 +163,7 @@ public class AccumuloInputFormatTest {
     
     final String rawConfigOpt = new AccumuloIteratorOption("iterator", key, value).toString();
     
-    assertEquals(rawConfigOpt, job.getConfiguration().get("AccumuloInputFormat.iterators.options."
+ InputFormatBase.DEFAULT_SEQUENCE));
+    assertEquals(rawConfigOpt, job.getConfiguration().get("AccumuloInputFormat.iterators.options."
+ SequencedFormatHelper.DEFAULT_SEQUENCE));
     
     List<AccumuloIteratorOption> opts = AccumuloInputFormat.getIteratorOptions(job.getConfiguration());
     assertEquals(1, opts.size());
@@ -228,7 +227,7 @@ public class AccumuloInputFormatTest {
     AccumuloInputFormat.setIteratorOption(job, "someIterator", "aKey", "aValue");
     
     Configuration conf = job.getConfiguration();
-    String options = conf.get("AccumuloInputFormat.iterators.options." + InputFormatBase.DEFAULT_SEQUENCE);
+    String options = conf.get("AccumuloInputFormat.iterators.options." + SequencedFormatHelper.DEFAULT_SEQUENCE);
     assertEquals(new String("someIterator:aKey:aValue"), options);
   }
   


Mime
View raw message