accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [5/5] git commit: ACCUMULO-1783 Rework a bit of the storage set(Store)Locations code to account for the upstream changes in the input format code.
Date Thu, 07 Nov 2013 05:24:07 GMT
ACCUMULO-1783 Rework a bit of the storage set(Store)Locations code to
account for the upstream changes in the input format code.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/22498f77
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/22498f77
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/22498f77

Branch: refs/heads/ACCUMULO-1783
Commit: 22498f775db53351d61996f94bfd027a8dbbdf0b
Parents: 904604d
Author: Josh Elser <elserj@apache.org>
Authored: Thu Nov 7 00:22:09 2013 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Thu Nov 7 00:22:09 2013 -0500

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   | 304 ++++++++++++++++---
 .../pig/AbstractAccumuloStorageTest.java        |  25 +-
 .../accumulo/pig/AccumuloPigClusterTest.java    |   4 -
 .../pig/AccumuloWholeRowStorageTest.java        |  11 +-
 4 files changed, 279 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/22498f77/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 9473753..1d53371 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -16,24 +16,31 @@
  */
 package org.apache.accumulo.pig;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.SequencedFormatHelper;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,8 +55,8 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadStoreCaster;
 import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.DataBag;
@@ -72,7 +79,8 @@ import org.joda.time.DateTime;
 public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface
{
   private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
   
-  private static final String COLON = ":", COMMA = ",";
+  private static final String COLON = ":", COMMA = ",", PERIOD = ".";
+  private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName(), OUTPUT_PREFIX
= AccumuloOutputFormat.class.getSimpleName();
   
   private Configuration conf;
   private RecordReader<Key,Value> reader;
@@ -204,36 +212,40 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
     conf = job.getConfiguration();
     setLocationFromUri(location);
     
-    int sequence = AccumuloInputFormat.nextSequence();
-    
-    // TODO Something more.. "certain".
-    if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured." + sequence,
false)) {
-      LOG.warn("InputFormat already configured for " + sequence);
-      return;
-      // throw new RuntimeException("Was provided sequence number which was already configured:
" + sequence);
-    }
-    
-    AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table, authorizations);
-    AccumuloInputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
-    if (columnFamilyColumnQualifierPairs.size() > 0) {
-      LOG.info("columns: " + columnFamilyColumnQualifierPairs);
-      AccumuloInputFormat.fetchColumns(conf, sequence, columnFamilyColumnQualifierPairs);
+    Map<String,String> entries = AccumuloInputFormat.getRelevantEntries(conf);
+
+    if (shouldSetInput(entries)) {
+      int sequence = AccumuloInputFormat.nextSequence(conf);
+      
+      // TODO Something more.. "certain".
+      if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured." + sequence,
false)) {
+        LOG.warn("InputFormat already configured for " + sequence);
+        return;
+        // throw new RuntimeException("Was provided sequence number which was already configured:
" + sequence);
+      }
+      
+      AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table,
authorizations);
+      AccumuloInputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
+      if (columnFamilyColumnQualifierPairs.size() > 0) {
+        LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+        AccumuloInputFormat.fetchColumns(conf, sequence, columnFamilyColumnQualifierPairs);
+      }
+      
+      Collection<Range> ranges = Collections.singleton(new Range(start, end));
+      
+      LOG.info("Scanning Accumulo for " + ranges);
+      
+      AccumuloInputFormat.setRanges(conf, sequence, ranges);
+      
+      configureInputFormat(conf, sequence);
     }
-    
-    Collection<Range> ranges = Collections.singleton(new Range(start, end));
-    
-    LOG.info("Scanning Accumulo for " + ranges);
-    
-    AccumuloInputFormat.setRanges(conf, sequence, ranges);
-    
-    configureInputFormat(conf, sequence);
   }
   
   protected void configureInputFormat(Configuration conf, int sequence) {
     
   }
   
-  protected void configureOutputFormat(Configuration conf) {
+  protected void configureOutputFormat(Configuration conf, int sequence) {
     
   }
   
@@ -268,22 +280,240 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
     conf = job.getConfiguration();
     setLocationFromUri(location);
     
-    int sequence = AccumuloOutputFormat.nextSequence();
+    Map<String,String> entries = AccumuloOutputFormat.getRelevantEntries(conf);
     
-    // TODO Something more.. "certain".
-    if (conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured." + sequence,
false)) {
-      LOG.warn("OutputFormat already configured for " + sequence);
-      return;
-      // throw new RuntimeException("Was provided sequence number which was already configured:
" + sequence);
+    if (shouldSetOutput(entries)) {
+      int sequence = AccumuloOutputFormat.nextSequence(conf);
+      
+      // TODO Something more.. "certain".
+      if (conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured." + sequence,
false)) {
+        LOG.warn("OutputFormat already configured for " + sequence);
+        return;
+        // throw new RuntimeException("Was provided sequence number which was already configured:
" + sequence);
+      }
+      
+      AccumuloOutputFormat.setOutputInfo(conf, sequence, user, password.getBytes(), true,
table);
+      AccumuloOutputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
+      AccumuloOutputFormat.setMaxLatency(conf, sequence, maxLatency);
+      AccumuloOutputFormat.setMaxMutationBufferSize(conf, sequence, maxMutationBufferSize);
+      AccumuloOutputFormat.setMaxWriteThreads(conf, sequence, maxWriteThreads);
+      
+      configureOutputFormat(conf, sequence);
+    } else {
+      LOG.debug("Not setting configuration as it appears that it has already been set");
     }
+  }
+  
+  private boolean shouldSetInput(Map<String,String> configEntries) {
+    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
     
-    AccumuloOutputFormat.setOutputInfo(conf, sequence, user, password.getBytes(), true, table);
-    AccumuloOutputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
-    AccumuloOutputFormat.setMaxLatency(conf, sequence, maxLatency);
-    AccumuloOutputFormat.setMaxMutationBufferSize(conf, sequence, maxMutationBufferSize);
-    AccumuloOutputFormat.setMaxWriteThreads(conf, sequence, maxWriteThreads);
+    for (Map<String,String> group : groupedConfigEntries.values()) {
+      if (null != inst) {
+        if (!inst.equals(group.get(INPUT_PREFIX + ".instanceName"))) {
+          continue;
+        }
+      } else if (null != group.get(INPUT_PREFIX + ".instanceName")) {
+        continue;
+      }
+      
+      if (null != zookeepers) {
+        if (!zookeepers.equals(group.get(INPUT_PREFIX + ".zooKeepers"))) {
+          continue;
+        }
+      } else if (null != group.get(INPUT_PREFIX + ".zooKeepers")) {
+        continue;
+      }
+      
+      if (null != user) {
+        if (!user.equals(group.get(INPUT_PREFIX + ".username"))) {
+          continue;
+        }
+      } else if (null != group.get(INPUT_PREFIX + ".username")) {
+        continue;
+      }
+      
+      if (null != password) {
+        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(INPUT_PREFIX
+ ".password"))) {
+          continue;
+        }
+      } else if (null != group.get(INPUT_PREFIX + ".password")) {
+        continue;
+      }
+      
+      if (null != table) {
+        if (!table.equals(group.get(INPUT_PREFIX + ".tablename"))) {
+          continue;
+        }
+      } else if (null != group.get(INPUT_PREFIX + ".tablename")) {
+        continue;
+      }
+      
+      if (null != authorizations) {
+        if (!authorizations.serialize().equals(group.get(INPUT_PREFIX + ".authorizations")))
{
+          continue;
+        }
+      } else if (null != group.get(INPUT_PREFIX + ".authorizations")) {
+        continue;
+      }
+      
+      String columnValues = group.get(INPUT_PREFIX + ".columns");
+      if (null != columnFamilyColumnQualifierPairs) {
+        StringBuilder expected = new StringBuilder(128);
+        for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) {
+          if (0 < expected.length()) {
+            expected.append(COMMA);
+          }
+          
+          expected.append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))));
+          if (column.getSecond() != null)
+            expected.append(COLON).append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))));
+        }
+        
+        if (!expected.toString().equals(columnValues)) {
+          continue;
+        }
+      } else if (null != columnValues){ 
+        continue;
+      }
+      
+      Range expected = new Range(start, end);
+      String serializedRanges = group.get(INPUT_PREFIX + ".ranges");
+      if (null != serializedRanges) {
+        try {
+          // We currently only support serializing one Range into the Configuration from
this Storage class
+          ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRanges.getBytes()));
+          Range range = new Range();
+          range.readFields(new DataInputStream(bais));
+          
+          if (!expected.equals(range)) {
+            continue;
+          }
+        } catch (IOException e) {
+          // Got an exception, they don't match
+          continue;
+        }
+      }
+      
+      
+      // We found a group of entries in the config which are (similar to) what
+      // we would have set.
+      return false;
+    }
+    
+    // We didn't find any entries that seemed to match, write the config
+    return true;
+  }
+  
+  private boolean shouldSetOutput(Map<String,String> configEntries) {
+    Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries);
+    
+    for (Map<String,String> group : groupedConfigEntries.values()) {
+      if (null != inst) {
+        if (!inst.equals(group.get(OUTPUT_PREFIX + ".instanceName"))) {
+          continue;
+        }
+      } else if (null != group.get(OUTPUT_PREFIX + ".instanceName")) {
+        continue;
+      }
+      
+      if (null != zookeepers) {
+        if (!zookeepers.equals(group.get(OUTPUT_PREFIX + ".zooKeepers"))) {
+          continue;
+        }
+      } else if (null != group.get(OUTPUT_PREFIX + ".zooKeepers")) {
+        continue;
+      }
+      
+      if (null != user) {
+        if (!user.equals(group.get(OUTPUT_PREFIX + ".username"))) {
+          continue;
+        }
+      } else if (null != group.get(OUTPUT_PREFIX + ".username")) {
+        continue;
+      }
+      
+      if (null != password) {
+        if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(OUTPUT_PREFIX
+ ".password"))) {
+          continue;
+        }
+      } else if (null != group.get(OUTPUT_PREFIX + ".password")) {
+        continue;
+      }
+      
+      if (null != table) {
+        if (!table.equals(group.get(OUTPUT_PREFIX + ".defaulttable"))) {
+          continue;
+        }
+      } else if (null != group.get(OUTPUT_PREFIX + ".defaulttable")) {
+        continue;
+      }
+      
+      String writeThreadsStr = group.get(OUTPUT_PREFIX + ".writethreads");
+      try {
+        if (null == writeThreadsStr || maxWriteThreads != Integer.parseInt(writeThreadsStr))
{
+            continue;
+          }
+      } catch (NumberFormatException e) {
+        // Wasn't a number, so it can't match what we were expecting
+        continue;
+      }
+      
+      String mutationBufferStr = group.get(OUTPUT_PREFIX + ".maxmemory");
+      try {
+        if (null == mutationBufferStr || maxMutationBufferSize != Long.parseLong(mutationBufferStr))
{
+            continue;
+          }
+      } catch (NumberFormatException e) {
+        // Wasn't a number, so it can't match what we were expecting
+        continue;
+      }
+      
+      String maxLatencyStr = group.get(OUTPUT_PREFIX + ".maxlatency");
+      try {
+        if (null == maxLatencyStr || maxLatency != Long.parseLong(maxLatencyStr)) {
+            continue;
+          }
+      } catch (NumberFormatException e) {
+        // Wasn't a number, so it can't match what we were expecting
+        continue;
+      }
+      
+      // We found a group of entries in the config which are (similar to) what
+      // we would have set.
+      return false;
+    }
+    
+    // We didn't find any entries that seemed to match, write the config
+    return true;
+  }
+  
+  private Map<Integer,Map<String,String>> permuteConfigEntries(Map<String,String>
entries) {
+    Map<Integer,Map<String,String>> groupedEntries = new HashMap<Integer,Map<String,String>>();
+    for (Entry<String,String> entry : entries.entrySet()) {
+      final String key = entry.getKey(), value = entry.getValue();
+      
+      if (key.endsWith(SequencedFormatHelper.CONFIGURED_SEQUENCES)) {
+        continue;
+      }
+      
+      int index = key.lastIndexOf(PERIOD);
+      if (-1 != index) {
+        int group = Integer.parseInt(key.substring(index + 1));
+        String name = key.substring(0, index);
+        
+        Map<String,String> entriesInGroup = groupedEntries.get(group);
+        if (null == entriesInGroup) {
+          entriesInGroup = new HashMap<String,String>();
+          groupedEntries.put(group, entriesInGroup);
+        }
+        
+        entriesInGroup.put(name, value);
+      } else {
+        LOG.warn("Could not parse key: " + key);
+      }
+    }
     
-    configureOutputFormat(conf);
+    return groupedEntries;
   }
   
   @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/22498f77/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index e9f0297..bc886ec 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -33,24 +33,18 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.data.Tuple;
-import org.junit.Before;
 import org.junit.Test;
 
 public class AbstractAccumuloStorageTest {
   
-  @Before
-  public void setup() {
-    AccumuloInputFormat.resetCounters();
-    AccumuloOutputFormat.resetCounters();
-  }
-  
-  public Job getExpectedLoadJob(int sequence, String inst, String zookeepers, String user,
String password, String table, String start, String end,
+  public Job getExpectedLoadJob(String inst, String zookeepers, String user, String password,
String table, String start, String end,
       Authorizations authorizations, List<Pair<Text,Text>> columnFamilyColumnQualifierPairs)
throws IOException {
     Collection<Range> ranges = new LinkedList<Range>();
     ranges.add(new Range(start, end));
     
     Job expected = new Job();
     Configuration expectedConf = expected.getConfiguration();
+    final int sequence = AccumuloInputFormat.nextSequence(expectedConf);
     AccumuloInputFormat.setInputInfo(expectedConf, sequence, user, password.getBytes(), table,
authorizations);
     AccumuloInputFormat.setZooKeeperInstance(expectedConf, sequence, inst, zookeepers);
     AccumuloInputFormat.fetchColumns(expectedConf, sequence, columnFamilyColumnQualifierPairs);
@@ -58,7 +52,7 @@ public class AbstractAccumuloStorageTest {
     return expected;
   }
   
-  public Job getDefaultExpectedLoadJob(int sequence) throws IOException {
+  public Job getDefaultExpectedLoadJob() throws IOException {
     String inst = "myinstance";
     String zookeepers = "127.0.0.1:2181";
     String user = "root";
@@ -73,14 +67,15 @@ public class AbstractAccumuloStorageTest {
     columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col2"), new
Text("cq2")));
     columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col3"), null));
     
-    Job expected = getExpectedLoadJob(sequence, inst, zookeepers, user, password, table,
start, end, authorizations, columnFamilyColumnQualifierPairs);
+    Job expected = getExpectedLoadJob(inst, zookeepers, user, password, table, start, end,
authorizations, columnFamilyColumnQualifierPairs);
     return expected;
   }
   
-  public Job getExpectedStoreJob(int sequence, String inst, String zookeepers, String user,
String password, String table, long maxWriteBufferSize, int writeThreads,
+  public Job getExpectedStoreJob(String inst, String zookeepers, String user, String password,
String table, long maxWriteBufferSize, int writeThreads,
       int maxWriteLatencyMS) throws IOException {
     Job expected = new Job();
     Configuration expectedConf = expected.getConfiguration();
+    final int sequence = AccumuloOutputFormat.nextSequence(expectedConf);
     AccumuloOutputFormat.setOutputInfo(expectedConf, sequence,user, password.getBytes(),
true, table);
     AccumuloOutputFormat.setZooKeeperInstance(expectedConf, sequence,inst, zookeepers);
     AccumuloOutputFormat.setMaxLatency(expectedConf, sequence,maxWriteLatencyMS);
@@ -90,7 +85,7 @@ public class AbstractAccumuloStorageTest {
     return expected;
   }
   
-  public Job getDefaultExpectedStoreJob(int sequence) throws IOException {
+  public Job getDefaultExpectedStoreJob() throws IOException {
     String inst = "myinstance";
     String zookeepers = "127.0.0.1:2181";
     String user = "root";
@@ -100,7 +95,7 @@ public class AbstractAccumuloStorageTest {
     int writeThreads = 7;
     int maxWriteLatencyMS = 30000;
     
-    Job expected = getExpectedStoreJob(sequence, inst, zookeepers, user, password, table,
maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
+    Job expected = getExpectedStoreJob(inst, zookeepers, user, password, table, maxWriteBufferSize,
writeThreads, maxWriteLatencyMS);
     return expected;
   }
   
@@ -136,7 +131,7 @@ public class AbstractAccumuloStorageTest {
     s.setLocation(getDefaultLoadLocation(), actual);
     Configuration actualConf = actual.getConfiguration();
     
-    Job expected = getDefaultExpectedLoadJob(1);
+    Job expected = getDefaultExpectedLoadJob();
     Configuration expectedConf = expected.getConfiguration();
     
     TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
@@ -150,7 +145,7 @@ public class AbstractAccumuloStorageTest {
     s.setStoreLocation(getDefaultStoreLocation(), actual);
     Configuration actualConf = actual.getConfiguration();
     
-    Job expected = getDefaultExpectedStoreJob(1);
+    Job expected = getDefaultExpectedStoreJob();
     Configuration expectedConf = expected.getConfiguration();
     
     TestUtils.assertConfigurationsEqual(expectedConf, actualConf);

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/22498f77/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
index 0e2abb5..10b0a2a 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloPigClusterTest.java
@@ -10,8 +10,6 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.minicluster.MiniAccumuloConfig;
@@ -54,8 +52,6 @@ public class AccumuloPigClusterTest {
   
   @Before
   public void beforeTest() throws Exception {
-    AccumuloInputFormat.resetCounters();
-    AccumuloOutputFormat.resetCounters();
     pig = new PigServer(ExecType.LOCAL, conf);
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/22498f77/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
index 3a0ab85..b57c7ba 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -30,7 +30,6 @@ import java.util.List;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -43,16 +42,9 @@ import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DefaultDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.junit.Before;
 import org.junit.Test;
 
 public class AccumuloWholeRowStorageTest {
-
-  @Before
-  public void setup() {
-    AccumuloInputFormat.resetCounters();
-    AccumuloOutputFormat.resetCounters();
-  }
   
   @Test
   public void testConfiguration() throws IOException {
@@ -64,9 +56,10 @@ public class AccumuloWholeRowStorageTest {
     s.setLocation(test.getDefaultLoadLocation(), actual);
     Configuration actualConf = actual.getConfiguration();
     
+    // A little brittle. We know this is the sequence number used when we create the DefaultExpectedLoadJob()
     final int sequence = 1;
     
-    Job expected = test.getDefaultExpectedLoadJob(sequence);
+    Job expected = test.getDefaultExpectedLoadJob();
     Configuration expectedConf = expected.getConfiguration();
     AccumuloInputFormat.addIterator(expectedConf, sequence, new IteratorSetting(50, WholeRowIterator.class));
     


Mime
View raw message