accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [1/2] git commit: ACCUMULO-1783 Rework the setLocation method to ignore the Configuration so we don't shoot ourselves in the foot.
Date Sat, 09 Nov 2013 03:37:25 GMT
Updated Branches:
  refs/heads/ACCUMULO-1783 22498f775 -> 4160c1615


ACCUMULO-1783 Rework the setLocation method to ignore the Configuration
so we don't shoot ourselves in the foot.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/63d29d4d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/63d29d4d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/63d29d4d

Branch: refs/heads/ACCUMULO-1783
Commit: 63d29d4de56f1cc664b5a433dc2d736e3bc7a066
Parents: 22498f7
Author: Josh Elser <elserj@apache.org>
Authored: Fri Nov 8 19:50:19 2013 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Fri Nov 8 19:50:19 2013 -0500

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   | 549 ++++++++++---------
 .../apache/accumulo/pig/AccumuloStorage.java    |   6 +-
 .../accumulo/pig/AccumuloWholeRowStorage.java   |   4 +-
 .../pig/AbstractAccumuloStorageTest.java        |  20 +-
 .../pig/AccumuloWholeRowStorageTest.java        |   5 +-
 5 files changed, 299 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 1d53371..5faf6c6 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -16,8 +16,6 @@
  */
 package org.apache.accumulo.pig;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -32,15 +30,12 @@ import java.util.Properties;
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.mapreduce.SequencedFormatHelper;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.TextUtil;
-import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -207,45 +202,66 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
     return writer;
   }
   
+  protected Map<String,String> getInputFormatEntries(Configuration conf) {
+    return getEntries(conf, INPUT_PREFIX);
+  }
+  
+  protected Map<String,String> getEntries(Configuration conf, String prefix) {
+    Map<String,String> entries = new HashMap<String,String>();
+    
+    for (Entry<String,String> entry : conf) {
+      String key = entry.getKey();
+      if (key.startsWith(prefix)) {
+        entries.put(key, entry.getValue());
+      }
+    }
+    
+    return entries;
+  }
+  
+  
   @Override
   public void setLocation(String location, Job job) throws IOException {
     conf = job.getConfiguration();
     setLocationFromUri(location);
     
-    Map<String,String> entries = AccumuloInputFormat.getRelevantEntries(conf);
-
-    if (shouldSetInput(entries)) {
-      int sequence = AccumuloInputFormat.nextSequence(conf);
-      
-      // TODO Something more.. "certain".
-      if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured." + sequence,
false)) {
-        LOG.warn("InputFormat already configured for " + sequence);
-        return;
-        // throw new RuntimeException("Was provided sequence number which was already configured:
" + sequence);
-      }
-      
-      AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table,
authorizations);
-      AccumuloInputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
-      if (columnFamilyColumnQualifierPairs.size() > 0) {
-        LOG.info("columns: " + columnFamilyColumnQualifierPairs);
-        AccumuloInputFormat.fetchColumns(conf, sequence, columnFamilyColumnQualifierPairs);
-      }
-      
-      Collection<Range> ranges = Collections.singleton(new Range(start, end));
-      
-      LOG.info("Scanning Accumulo for " + ranges);
-      
-      AccumuloInputFormat.setRanges(conf, sequence, ranges);
-      
-      configureInputFormat(conf, sequence);
+    Map<String,String> entries = getInputFormatEntries(conf);
+    
+    Exception e = new Exception("setLocation");
+    e.printStackTrace(System.out);
+    System.out.println(entries);
+    
+    for (String key : entries.keySet()) {
+      conf.unset(key);
+    }
+    
+    entries = getInputFormatEntries(conf);
+    System.out.println(entries);
+    
+    AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
+    AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
+    if (columnFamilyColumnQualifierPairs.size() > 0) {
+      LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+      AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
     }
+    
+    Collection<Range> ranges = Collections.singleton(new Range(start, end));
+    
+    LOG.info("Scanning Accumulo for " + ranges + " for table " + table);
+    
+    AccumuloInputFormat.setRanges(conf, ranges);
+    
+    configureInputFormat(conf);
+    
+    entries = getInputFormatEntries(conf);
+    System.out.println(entries);
   }
   
-  protected void configureInputFormat(Configuration conf, int sequence) {
+  protected void configureInputFormat(Configuration conf) {
     
   }
   
-  protected void configureOutputFormat(Configuration conf, int sequence) {
+  protected void configureOutputFormat(Configuration conf) {
     
   }
   
@@ -279,242 +295,245 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
   public void setStoreLocation(String location, Job job) throws IOException {
     conf = job.getConfiguration();
     setLocationFromUri(location);
-    
-    Map<String,String> entries = AccumuloOutputFormat.getRelevantEntries(conf);
-    
-    if (shouldSetOutput(entries)) {
-      int sequence = AccumuloOutputFormat.nextSequence(conf);
-      
-      // TODO Something more.. "certain".
-      if (conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured." + sequence,
false)) {
-        LOG.warn("OutputFormat already configured for " + sequence);
-        return;
-        // throw new RuntimeException("Was provided sequence number which was already configured:
" + sequence);
-      }
-      
-      AccumuloOutputFormat.setOutputInfo(conf, sequence, user, password.getBytes(), true,
table);
-      AccumuloOutputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
-      AccumuloOutputFormat.setMaxLatency(conf, sequence, maxLatency);
-      AccumuloOutputFormat.setMaxMutationBufferSize(conf, sequence, maxMutationBufferSize);
-      AccumuloOutputFormat.setMaxWriteThreads(conf, sequence, maxWriteThreads);
-      
-      configureOutputFormat(conf, sequence);
-    } else {
-      LOG.debug("Not setting configuration as it appears that it has already been set");
+//    
+//    Map<String,String> entries = AccumuloOutputFormat.getRelevantEntries(conf);
+//    
+//    Exception e = new Exception("setStoreLocation");
+//    e.printStackTrace(System.out);
+//    System.out.println(entries);
+//    
+//    for (String key : entries.keySet()) {
+//      conf.unset(key);
+//    }
+//    
+//    entries = AccumuloOutputFormat.getRelevantEntries(conf);
+//    System.out.println(entries);
+    if (conf.get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) {
+      AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
+      AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
+      AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
+      AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
+      AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
+      
+      LOG.info("Writing data to " + table);
+      
+      configureOutputFormat(conf);
     }
-  }
-  
-  private boolean shouldSetInput(Map<String,String> configEntries) {
-    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
     
-    for (Map<String,String> group : groupedConfigEntries.values()) {
-      if (null != inst) {
-        if (!inst.equals(group.get(INPUT_PREFIX + ".instanceName"))) {
-          continue;
-        }
-      } else if (null != group.get(INPUT_PREFIX + ".instanceName")) {
-        continue;
-      }
-      
-      if (null != zookeepers) {
-        if (!zookeepers.equals(group.get(INPUT_PREFIX + ".zooKeepers"))) {
-          continue;
-        }
-      } else if (null != group.get(INPUT_PREFIX + ".zooKeepers")) {
-        continue;
-      }
-      
-      if (null != user) {
-        if (!user.equals(group.get(INPUT_PREFIX + ".username"))) {
-          continue;
-        }
-      } else if (null != group.get(INPUT_PREFIX + ".username")) {
-        continue;
-      }
-      
-      if (null != password) {
-        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(INPUT_PREFIX
+ ".password"))) {
-          continue;
-        }
-      } else if (null != group.get(INPUT_PREFIX + ".password")) {
-        continue;
-      }
-      
-      if (null != table) {
-        if (!table.equals(group.get(INPUT_PREFIX + ".tablename"))) {
-          continue;
-        }
-      } else if (null != group.get(INPUT_PREFIX + ".tablename")) {
-        continue;
-      }
-      
-      if (null != authorizations) {
-        if (!authorizations.serialize().equals(group.get(INPUT_PREFIX + ".authorizations")))
{
-          continue;
-        }
-      } else if (null != group.get(INPUT_PREFIX + ".authorizations")) {
-        continue;
-      }
-      
-      String columnValues = group.get(INPUT_PREFIX + ".columns");
-      if (null != columnFamilyColumnQualifierPairs) {
-        StringBuilder expected = new StringBuilder(128);
-        for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
-          if (0 < expected.length()) {
-            expected.append(COMMA);
-          }
-          
-          expected.append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))));
-          if (column.getSecond() != null)
-            expected.append(COLON).append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))));
-        }
-        
-        if (!expected.toString().equals(columnValues)) {
-          continue;
-        }
-      } else if (null != columnValues){ 
-        continue;
-      }
-      
-      Range expected = new Range(start, end);
-      String serializedRanges = group.get(INPUT_PREFIX + ".ranges");
-      if (null != serializedRanges) {
-        try {
-          // We currently only support serializing one Range into the Configuration from
this Storage class
-          ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRanges.getBytes()));
-          Range range = new Range();
-          range.readFields(new DataInputStream(bais));
-          
-          if (!expected.equals(range)) {
-            continue;
-          }
-        } catch (IOException e) {
-          // Got an exception, they don't match
-          continue;
-        }
-      }
-      
-      
-      // We found a group of entries in the config which are (similar to) what
-      // we would have set.
-      return false;
-    }
-    
-    // We didn't find any entries that seemed to match, write the config
-    return true;
+//    entries = AccumuloOutputFormat.getRelevantEntries(conf);
+//    System.out.println(entries);
   }
   
-  private boolean shouldSetOutput(Map<String,String> configEntries) {
-    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
-    
-    for (Map<String,String> group : groupedConfigEntries.values()) {
-      if (null != inst) {
-        if (!inst.equals(group.get(OUTPUT_PREFIX + ".instanceName"))) {
-          continue;
-        }
-      } else if (null != group.get(OUTPUT_PREFIX + ".instanceName")) {
-        continue;
-      }
-      
-      if (null != zookeepers) {
-        if (!zookeepers.equals(group.get(OUTPUT_PREFIX + ".zooKeepers"))) {
-          continue;
-        }
-      } else if (null != group.get(OUTPUT_PREFIX + ".zooKeepers")) {
-        continue;
-      }
-      
-      if (null != user) {
-        if (!user.equals(group.get(OUTPUT_PREFIX + ".username"))) {
-          continue;
-        }
-      } else if (null != group.get(OUTPUT_PREFIX + ".username")) {
-        continue;
-      }
-      
-      if (null != password) {
-        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(OUTPUT_PREFIX
+ ".password"))) {
-          continue;
-        }
-      } else if (null != group.get(OUTPUT_PREFIX + ".password")) {
-        continue;
-      }
-      
-      if (null != table) {
-        if (!table.equals(group.get(OUTPUT_PREFIX + ".defaulttable"))) {
-          continue;
-        }
-      } else if (null != group.get(OUTPUT_PREFIX + ".defaulttable")) {
-        continue;
-      }
-      
-      String writeThreadsStr = group.get(OUTPUT_PREFIX + ".writethreads");
-      try {
-        if (null == writeThreadsStr || maxWriteThreads != Integer.parseInt(writeThreadsStr))
{
-            continue;
-          }
-      } catch (NumberFormatException e) {
-        // Wasn't a number, so it can't match what we were expecting
-        continue;
-      }
-      
-      String mutationBufferStr = group.get(OUTPUT_PREFIX + ".maxmemory");
-      try {
-        if (null == mutationBufferStr || maxMutationBufferSize != Long.parseLong(mutationBufferStr))
{
-            continue;
-          }
-      } catch (NumberFormatException e) {
-        // Wasn't a number, so it can't match what we were expecting
-        continue;
-      }
-      
-      String maxLatencyStr = group.get(OUTPUT_PREFIX + ".maxlatency");
-      try {
-        if (null == maxLatencyStr || maxLatency != Long.parseLong(maxLatencyStr)) {
-            continue;
-          }
-      } catch (NumberFormatException e) {
-        // Wasn't a number, so it can't match what we were expecting
-        continue;
-      }
-      
-      // We found a group of entries in the config which are (similar to) what
-      // we would have set.
-      return false;
-    }
-    
-    // We didn't find any entries that seemed to match, write the config
-    return true;
-  }
-  
-  private Map<Integer,Map<String,String>> permuteConfigEntries(Map<String,String>
entries) {
-    Map<Integer,Map<String,String>> groupedEntries = new HashMap<Integer,Map<String,String>>();
-    for (Entry<String,String> entry : entries.entrySet()) {
-      final String key = entry.getKey(), value = entry.getValue();
-      
-      if (key.endsWith(SequencedFormatHelper.CONFIGURED_SEQUENCES)) {
-        continue;
-      }
-      
-      int index = key.lastIndexOf(PERIOD);
-      if (-1 != index) {
-        int group = Integer.parseInt(key.substring(index + 1));
-        String name = key.substring(0, index);
-        
-        Map<String,String> entriesInGroup = groupedEntries.get(group);
-        if (null == entriesInGroup) {
-          entriesInGroup = new HashMap<String,String>();
-          groupedEntries.put(group, entriesInGroup);
-        }
-        
-        entriesInGroup.put(name, value);
-      } else {
-        LOG.warn("Could not parse key: " + key);
-      }
-    }
-    
-    return groupedEntries;
-  }
+//  private boolean shouldSetInput(Map<String,String> configEntries) {
+//    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
+//    
+//    for (Map<String,String> group : groupedConfigEntries.values()) {
+//      if (null != inst) {
+//        if (!inst.equals(group.get(INPUT_PREFIX + ".instanceName"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(INPUT_PREFIX + ".instanceName")) {
+//        continue;
+//      }
+//      
+//      if (null != zookeepers) {
+//        if (!zookeepers.equals(group.get(INPUT_PREFIX + ".zooKeepers"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(INPUT_PREFIX + ".zooKeepers")) {
+//        continue;
+//      }
+//      
+//      if (null != user) {
+//        if (!user.equals(group.get(INPUT_PREFIX + ".username"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(INPUT_PREFIX + ".username")) {
+//        continue;
+//      }
+//      
+//      if (null != password) {
+//        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(INPUT_PREFIX
+ ".password"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(INPUT_PREFIX + ".password")) {
+//        continue;
+//      }
+//      
+//      if (null != table) {
+//        if (!table.equals(group.get(INPUT_PREFIX + ".tablename"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(INPUT_PREFIX + ".tablename")) {
+//        continue;
+//      }
+//      
+//      if (null != authorizations) {
+//        if (!authorizations.serialize().equals(group.get(INPUT_PREFIX + ".authorizations")))
{
+//          continue;
+//        }
+//      } else if (null != group.get(INPUT_PREFIX + ".authorizations")) {
+//        continue;
+//      }
+//      
+//      String columnValues = group.get(INPUT_PREFIX + ".columns");
+//      if (null != columnFamilyColumnQualifierPairs) {
+//        StringBuilder expected = new StringBuilder(128);
+//        for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
+//          if (0 < expected.length()) {
+//            expected.append(COMMA);
+//          }
+//          
+//          expected.append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))));
+//          if (column.getSecond() != null)
+//            expected.append(COLON).append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))));
+//        }
+//        
+//        if (!expected.toString().equals(columnValues)) {
+//          continue;
+//        }
+//      } else if (null != columnValues) {
+//        continue;
+//      }
+//      
+//      Range expected = new Range(start, end);
+//      String serializedRanges = group.get(INPUT_PREFIX + ".ranges");
+//      if (null != serializedRanges) {
+//        try {
+//          // We currently only support serializing one Range into the Configuration from
this Storage class
+//          ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRanges.getBytes()));
+//          Range range = new Range();
+//          range.readFields(new DataInputStream(bais));
+//          
+//          if (!expected.equals(range)) {
+//            continue;
+//          }
+//        } catch (IOException e) {
+//          // Got an exception, they don't match
+//          continue;
+//        }
+//      }
+//      
+//      // We found a group of entries in the config which are (similar to) what
+//      // we would have set.
+//      return false;
+//    }
+//    
+//    // We didn't find any entries that seemed to match, write the config
+//    return true;
+//  }
+//  
+//  private boolean shouldSetOutput(Map<String,String> configEntries) {
+//    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
+//    
+//    for (Map<String,String> group : groupedConfigEntries.values()) {
+//      if (null != inst) {
+//        if (!inst.equals(group.get(OUTPUT_PREFIX + ".instanceName"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(OUTPUT_PREFIX + ".instanceName")) {
+//        continue;
+//      }
+//      
+//      if (null != zookeepers) {
+//        if (!zookeepers.equals(group.get(OUTPUT_PREFIX + ".zooKeepers"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(OUTPUT_PREFIX + ".zooKeepers")) {
+//        continue;
+//      }
+//      
+//      if (null != user) {
+//        if (!user.equals(group.get(OUTPUT_PREFIX + ".username"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(OUTPUT_PREFIX + ".username")) {
+//        continue;
+//      }
+//      
+//      if (null != password) {
+//        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(OUTPUT_PREFIX
+ ".password"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(OUTPUT_PREFIX + ".password")) {
+//        continue;
+//      }
+//      
+//      if (null != table) {
+//        if (!table.equals(group.get(OUTPUT_PREFIX + ".defaulttable"))) {
+//          continue;
+//        }
+//      } else if (null != group.get(OUTPUT_PREFIX + ".defaulttable")) {
+//        continue;
+//      }
+//      
+//      String writeThreadsStr = group.get(OUTPUT_PREFIX + ".writethreads");
+//      try {
+//        if (null == writeThreadsStr || maxWriteThreads != Integer.parseInt(writeThreadsStr))
{
+//          continue;
+//        }
+//      } catch (NumberFormatException e) {
+//        // Wasn't a number, so it can't match what we were expecting
+//        continue;
+//      }
+//      
+//      String mutationBufferStr = group.get(OUTPUT_PREFIX + ".maxmemory");
+//      try {
+//        if (null == mutationBufferStr || maxMutationBufferSize != Long.parseLong(mutationBufferStr))
{
+//          continue;
+//        }
+//      } catch (NumberFormatException e) {
+//        // Wasn't a number, so it can't match what we were expecting
+//        continue;
+//      }
+//      
+//      String maxLatencyStr = group.get(OUTPUT_PREFIX + ".maxlatency");
+//      try {
+//        if (null == maxLatencyStr || maxLatency != Long.parseLong(maxLatencyStr)) {
+//          continue;
+//        }
+//      } catch (NumberFormatException e) {
+//        // Wasn't a number, so it can't match what we were expecting
+//        continue;
+//      }
+//      
+//      // We found a group of entries in the config which are (similar to) what
+//      // we would have set.
+//      return false;
+//    }
+//    
+//    // We didn't find any entries that seemed to match, write the config
+//    return true;
+//  }
+//  
+//  private Map<Integer,Map<String,String>> permuteConfigEntries(Map<String,String>
entries) {
+//    Map<Integer,Map<String,String>> groupedEntries = new HashMap<Integer,Map<String,String>>();
+//    for (Entry<String,String> entry : entries.entrySet()) {
+//      final String key = entry.getKey(), value = entry.getValue();
+//      
+//      if (key.endsWith(SequencedFormatHelper.CONFIGURED_SEQUENCES)) {
+//        continue;
+//      }
+//      
+//      int index = key.lastIndexOf(PERIOD);
+//      if (-1 != index) {
+//        int group = Integer.parseInt(key.substring(index + 1));
+//        String name = key.substring(0, index);
+//        
+//        Map<String,String> entriesInGroup = groupedEntries.get(group);
+//        if (null == entriesInGroup) {
+//          entriesInGroup = new HashMap<String,String>();
+//          groupedEntries.put(group, entriesInGroup);
+//        }
+//        
+//        entriesInGroup.put(name, value);
+//      } else {
+//        LOG.warn("Could not parse key: " + key);
+//      }
+//    }
+//    
+//    return groupedEntries;
+//  }
   
   @SuppressWarnings("rawtypes")
   public OutputFormat getOutputFormat() {

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index bd43dce..8e9cfef 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -73,7 +73,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   
   @Override
   protected Tuple getTuple(Key key, Value value) throws IOException {
-    
+//    System.out.println(key);
     SortedMap<Key,Value> rowKVs = WholeRowIterator.decodeRow(key, value);
     
     List<Object> tupleEntries = Lists.newLinkedList();
@@ -142,8 +142,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   }
   
   @Override
-  protected void configureInputFormat(Configuration conf, int sequence) {
-    AccumuloInputFormat.addIterator(conf, sequence, new IteratorSetting(50, WholeRowIterator.class));
+  protected void configureInputFormat(Configuration conf) {
+    AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
index 499558f..a6db638 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
@@ -84,8 +84,8 @@ public class AccumuloWholeRowStorage extends AbstractAccumuloStorage {
   }
   
   @Override
-  protected void configureInputFormat(Configuration conf, int sequence) {
-    AccumuloInputFormat.addIterator(conf, sequence, new IteratorSetting(50, WholeRowIterator.class));
+  protected void configureInputFormat(Configuration conf) {
+    AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index bc886ec..1b5b81a 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -44,11 +44,10 @@ public class AbstractAccumuloStorageTest {
     
     Job expected = new Job();
     Configuration expectedConf = expected.getConfiguration();
-    final int sequence = AccumuloInputFormat.nextSequence(expectedConf);
-    AccumuloInputFormat.setInputInfo(expectedConf, sequence, user, password.getBytes(), table,
authorizations);
-    AccumuloInputFormat.setZooKeeperInstance(expectedConf, sequence, inst, zookeepers);
-    AccumuloInputFormat.fetchColumns(expectedConf, sequence, columnFamilyColumnQualifierPairs);
-    AccumuloInputFormat.setRanges(expectedConf, sequence, ranges);
+    AccumuloInputFormat.setInputInfo(expectedConf, user, password.getBytes(), table, authorizations);
+    AccumuloInputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
+    AccumuloInputFormat.fetchColumns(expectedConf, columnFamilyColumnQualifierPairs);
+    AccumuloInputFormat.setRanges(expectedConf, ranges);
     return expected;
   }
   
@@ -75,12 +74,11 @@ public class AbstractAccumuloStorageTest {
       int maxWriteLatencyMS) throws IOException {
     Job expected = new Job();
     Configuration expectedConf = expected.getConfiguration();
-    final int sequence = AccumuloOutputFormat.nextSequence(expectedConf);
-    AccumuloOutputFormat.setOutputInfo(expectedConf, sequence,user, password.getBytes(),
true, table);
-    AccumuloOutputFormat.setZooKeeperInstance(expectedConf, sequence,inst, zookeepers);
-    AccumuloOutputFormat.setMaxLatency(expectedConf, sequence,maxWriteLatencyMS);
-    AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, sequence,maxWriteBufferSize);
-    AccumuloOutputFormat.setMaxWriteThreads(expectedConf, sequence,writeThreads);
+    AccumuloOutputFormat.setOutputInfo(expectedConf, user, password.getBytes(), true, table);
+    AccumuloOutputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
+    AccumuloOutputFormat.setMaxLatency(expectedConf, maxWriteLatencyMS);
+    AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, maxWriteBufferSize);
+    AccumuloOutputFormat.setMaxWriteThreads(expectedConf, writeThreads);
     
     return expected;
   }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/63d29d4d/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
index b57c7ba..690d86c 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -56,12 +56,9 @@ public class AccumuloWholeRowStorageTest {
     s.setLocation(test.getDefaultLoadLocation(), actual);
     Configuration actualConf = actual.getConfiguration();
     
-    // A little brittle. We know this is the sequence number used when we create the DefaultExpectedLoadJob()
-    final int sequence = 1;
-    
     Job expected = test.getDefaultExpectedLoadJob();
     Configuration expectedConf = expected.getConfiguration();
-    AccumuloInputFormat.addIterator(expectedConf, sequence, new IteratorSetting(50, WholeRowIterator.class));
+    AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(50, WholeRowIterator.class));
     
     TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
   }


Mime
View raw message