accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [4/6] git commit: ACCUMULO-1854 Make the same changes to AOF as AIF has.
Date Thu, 07 Nov 2013 05:24:25 GMT
ACCUMULO-1854 Make the same changes to AOF as AIF has.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1fe22381
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1fe22381
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1fe22381

Branch: refs/heads/ACCUMULO-1854-multi-aif
Commit: 1fe223813a246a9943dbc9eda1a71de07ae27f12
Parents: c5dc070
Author: Josh Elser <elserj@apache.org>
Authored: Wed Nov 6 15:44:19 2013 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Wed Nov 6 15:44:19 2013 -0500

----------------------------------------------------------------------
 .../client/mapreduce/AccumuloOutputFormat.java  | 163 +++++++++++++++----
 .../mapreduce/AccumuloOutputFormatTest.java     |   8 +-
 2 files changed, 136 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1fe22381/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
index 66e85fd..5e5e43d 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
@@ -17,10 +17,13 @@
 package org.apache.accumulo.core.client.mapreduce;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -41,6 +44,7 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -91,28 +95,126 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   private static final long DEFAULT_MAX_MUTATION_BUFFER_SIZE = 50 * 1024 * 1024; // 50MB
   private static final int DEFAULT_MAX_LATENCY = 60 * 1000; // 1 minute
   private static final int DEFAULT_NUM_WRITE_THREADS = 2;
-
-  private static final AtomicInteger NUM_CONFIGURATIONS_LOADED = new AtomicInteger(0);
-  private static final AtomicInteger NUM_CONFIGURATIONS_PROCESSED = new AtomicInteger(0);
+  
   private static final int DEFAULT_SEQUENCE = 0;
   private static final String SEQ_DELIM = ".";
+  
+  private static final String COMMA = ",";
+  private static final String CONFIGURED_SEQUENCES = PREFIX + ".configuredSeqs";
+  private static final String DEFAULT_SEQ_USED = PREFIX + ".defaultSequenceUsed";
+  private static final String PROCESSED_SEQUENCES = PREFIX + ".processedSeqs";
+  private static final String TRUE = "true";
+
+
 
   /**
    * Get a unique identifier for these configurations
    * 
    * @return A unique number to provide to future AccumuloInputFormat calls
    */
-  public static int nextSequence() {
-    return NUM_CONFIGURATIONS_LOADED.incrementAndGet();
+  public static synchronized int nextSequence(Configuration conf) {
+    String value = conf.get(CONFIGURED_SEQUENCES);
+    if (null == value) {
+      conf.set(CONFIGURED_SEQUENCES, "1");
+      return 1;
+    } else {
+      String[] splitValues = StringUtils.split(value, COMMA);
+      int newValue = Integer.parseInt(splitValues[splitValues.length-1]) + 1;
+      
+      conf.set(CONFIGURED_SEQUENCES, value + COMMA + newValue);
+      return newValue;
+    }
   }
+  
+  /**
+   * Using the provided Configuration, return the next sequence number to process.
+   * @param conf A Configuration object used to store AccumuloInputFormat information into
+   * @return The next sequence number to process, -1 when finished.
+   * @throws NoSuchElementException
+   */
+  protected static synchronized int nextSequenceToProcess(Configuration conf) throws NoSuchElementException
{
+    String[] processedConfs = conf.getStrings(PROCESSED_SEQUENCES);
+    
+    // We haven't set anything, so we need to find the first to return
+    if (null == processedConfs || 0 == processedConfs.length) {
+      // Check to see if the default sequence was used
+      boolean defaultSeqUsed = conf.getBoolean(DEFAULT_SEQ_USED, false);
+      
+      // If so, set that we're processing it and return the value of the default
+      if (defaultSeqUsed) {
+        conf.set(PROCESSED_SEQUENCES, Integer.toString(DEFAULT_SEQUENCE));
+        return DEFAULT_SEQUENCE;
+      }
+      
+      String[] loadedConfs = conf.getStrings(CONFIGURED_SEQUENCES);
+      
+      // There was *nothing* loaded, fail.
+      if (null == loadedConfs || 0 == loadedConfs.length) {
+        throw new NoSuchElementException("Sequence was requested to process but none exist
to return");
+      }
+      
+      // We have loaded configuration(s), use the first
+      int firstLoaded = Integer.parseInt(loadedConfs[0]);
+      conf.setInt(PROCESSED_SEQUENCES, firstLoaded);
+      
+      return firstLoaded;
+    }
+    
+    // We've previously parsed some confs, need to find the next one to load
+    int lastProcessedSeq = Integer.valueOf(processedConfs[processedConfs.length - 1]);
+    String[] configuredSequencesArray = conf.getStrings(CONFIGURED_SEQUENCES);
+    
+    // We only have the default sequence, no specifics.
+    // Getting here, we already know that we processed that default
+    if (null == configuredSequencesArray) {
+      return -1;
+    }
 
-  protected static String merge(String name, Integer sequence) {
-    return name + SEQ_DELIM + sequence;
+    List<Integer> configuredSequences = new ArrayList<Integer>(configuredSequencesArray.length
+ 1);
+    
+    // If we used the default sequence ID, add that into the list of configured sequences
+    if (conf.getBoolean(DEFAULT_SEQ_USED, false)) {
+      configuredSequences.add(DEFAULT_SEQUENCE);
+    }
+
+    // Add the rest of any sequences to our list
+    for (String configuredSequence : configuredSequencesArray) {
+      configuredSequences.add(Integer.parseInt(configuredSequence));
+    }
+    
+    int lastParsedSeqIndex = configuredSequences.size() - 1;
+    
+    // Find the next sequence number after the one we last processed
+    for (; lastParsedSeqIndex >= 0; lastParsedSeqIndex--) {
+      int lastLoadedValue = configuredSequences.get(lastParsedSeqIndex);
+      
+      if (lastLoadedValue == lastProcessedSeq) {
+        break;
+      }
+    }
+    
+    // We either had no sequences to match or we matched the last configured sequence
+    // Both of which are equivalent to no (more) sequences to process
+    if (-1 == lastParsedSeqIndex || lastParsedSeqIndex + 1 >= configuredSequences.size())
{
+      return -1;
+    }
+    
+    // Get the value of the sequence at that offset
+    int nextSequence = configuredSequences.get(lastParsedSeqIndex + 1);
+    conf.set(PROCESSED_SEQUENCES, conf.get(PROCESSED_SEQUENCES) + COMMA + nextSequence);
+    
+    return nextSequence;
   }
   
-  public static void resetCounters() {
-    NUM_CONFIGURATIONS_LOADED.set(0);
-    NUM_CONFIGURATIONS_PROCESSED.set(0);
+  protected static void setDefaultSequenceUsed(Configuration conf) {
+    String value = conf.get(DEFAULT_SEQ_USED);
+    if (null == value || !TRUE.equals(value)) {
+      conf.setBoolean(DEFAULT_SEQ_USED, true);
+    }
+  }
+
+  protected static String merge(String name, Integer sequence) {
+    return name + SEQ_DELIM + sequence;
   }
   
   public static Map<String,String> getRelevantEntries(Configuration conf) {
@@ -163,6 +265,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
    *          the table to use when the tablename is null in the write call
    */
   public static void setOutputInfo(Configuration conf, String user, byte[] passwd, boolean
createTables, String defaultTable) {
+    setDefaultSequenceUsed(conf);
     setOutputInfo(conf, DEFAULT_SEQUENCE, user, passwd, createTables, defaultTable);
   }
   
@@ -202,6 +305,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   public static void setZooKeeperInstance(Configuration conf, String instanceName, String
zooKeepers) {
+    setDefaultSequenceUsed(conf);
     setZooKeeperInstance(conf, DEFAULT_SEQUENCE, instanceName, zooKeepers);
   }
   
@@ -224,6 +328,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   public static void setMockInstance(Configuration conf, String instanceName) {
+    setDefaultSequenceUsed(conf);
     setMockInstance(conf, DEFAULT_SEQUENCE, instanceName);
   }
   
@@ -241,6 +346,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   public static void setMaxMutationBufferSize(Configuration conf, long numberOfBytes) {
+    setDefaultSequenceUsed(conf);
     setMaxMutationBufferSize(conf, DEFAULT_SEQUENCE, numberOfBytes);
   }
   
@@ -256,6 +362,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   public static void setMaxLatency(Configuration conf, int numberOfMilliseconds) {
+    setDefaultSequenceUsed(conf);
     setMaxLatency(conf, DEFAULT_SEQUENCE, numberOfMilliseconds);
   }
   
@@ -271,6 +378,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   public static void setMaxWriteThreads(Configuration conf, int numberOfThreads) {
+    setDefaultSequenceUsed(conf);
     setMaxWriteThreads(conf, DEFAULT_SEQUENCE, numberOfThreads);
   }
   
@@ -286,6 +394,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   public static void setLogLevel(Configuration conf, Level level) {
+    setDefaultSequenceUsed(conf);
     setLogLevel(conf, DEFAULT_SEQUENCE, level);
   }
   
@@ -302,6 +411,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   }
   
   public static void setSimulationMode(Configuration conf) {
+    setDefaultSequenceUsed(conf);
     setSimulationMode(conf, DEFAULT_SEQUENCE);
   }
   
@@ -625,17 +735,15 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   
   @Override
   public void checkOutputSpecs(JobContext job) throws IOException {
-    final int sequencesToCheck = NUM_CONFIGURATIONS_LOADED.get();
-    final Configuration conf = job.getConfiguration();
+    Configuration conf = job.getConfiguration();
     
-    if (0 == sequencesToCheck) {
-      log.debug("No configurations loaded, checking the default");
-      checkConfiguration(conf, sequencesToCheck);
-    } else {
-      log.debug(sequencesToCheck + " configurations loaded");
-      for (int i = 1; i <= sequencesToCheck; i++) {
-        checkConfiguration(conf, i);
-      }
+    // Avoid using the above methods as they will alter the conf.
+    // We just want to inspect what is loaded
+    String[] loadedConfs = conf.getStrings(CONFIGURED_SEQUENCES);
+    
+    for (String loadedConf : loadedConfs) {
+      int sequence = Integer.parseInt(loadedConf);
+      checkConfiguration(conf, sequence);
     }
   }
   
@@ -663,18 +771,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation>
{
   
   @Override
   public RecordWriter<Text,Mutation> getRecordWriter(TaskAttemptContext attempt) throws
IOException {
-    final int sequence;
-    if (0 == NUM_CONFIGURATIONS_LOADED.get()) {
-      sequence = DEFAULT_SEQUENCE;
-      
-      log.debug("No sequence numbers were given, falling back to the default sequence number");
-    } else {
-      sequence = NUM_CONFIGURATIONS_PROCESSED.incrementAndGet();
-      
-      if (sequence > NUM_CONFIGURATIONS_LOADED.get()) {
-        log.warn("Attempting to use AccumuloOutputFormat information from Configuration using
a sequence number that wasn't assigned");
-      }
-    }
+    final int sequence = nextSequenceToProcess(attempt.getConfiguration());
+    
+    log.debug("Creating RecordWriter for sequence " + sequence);
     
     try {
       return new AccumuloRecordWriter(attempt, sequence);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1fe22381/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
index bc1bd1a..5599cae 100644
--- a/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
+++ b/src/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
@@ -132,9 +132,9 @@ public class AccumuloOutputFormatTest {
   
   @Test
   public void testMultiInstanceConfiguration() throws Exception {
-    int seq1 = AccumuloOutputFormat.nextSequence(), seq2 = AccumuloOutputFormat.nextSequence();
-    
     Configuration conf = new Configuration();
+    int seq1 = AccumuloOutputFormat.nextSequence(conf), seq2 = AccumuloOutputFormat.nextSequence(conf);
+    
     AccumuloOutputFormat.setOutputInfo(conf, seq1, "root1", "1".getBytes(), false, "testtable1");
     AccumuloOutputFormat.setMockInstance(conf, seq1, "testinstance1");
     
@@ -161,7 +161,7 @@ public class AccumuloOutputFormatTest {
   @Test
   public void testConfigEntries() throws Exception {
     Configuration conf = new Configuration();
-    int seq1 = AccumuloOutputFormat.nextSequence(), seq2 = AccumuloOutputFormat.nextSequence();
+    int seq1 = AccumuloOutputFormat.nextSequence(conf), seq2 = AccumuloOutputFormat.nextSequence(conf);
     
     AccumuloOutputFormat.setOutputInfo(conf, seq1, "root1", "1".getBytes(), false, "testtable1");
     AccumuloOutputFormat.setZooKeeperInstance(conf, seq1, "instance1", "zk1");
@@ -189,6 +189,8 @@ public class AccumuloOutputFormatTest {
     expected.put(prefix + ".configured.2", "true");
     expected.put(prefix + ".instanceConfigured.2", "true");
     
+    expected.put(prefix + ".configuredSeqs", "1,2");
+    
     Map<String,String> actual = AccumuloOutputFormat.getRelevantEntries(conf);
     
     assertEquals(expected, actual);


Mime
View raw message