accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cjno...@apache.org
Subject [05/11] Adding new TableQueryConfig object for setting multiple table info in the InputFormatBase
Date Wed, 02 Oct 2013 03:22:41 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/fdf4cadb/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
index c9539c4..0f6655a 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
@@ -16,27 +16,40 @@
  */
 package org.apache.accumulo.core.client.mapreduce;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.TableQueryConfig;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.RegExFilter;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -49,11 +62,12 @@ import org.apache.hadoop.util.ToolRunner;
 import org.junit.Test;
 
 public class AccumuloInputFormatTest {
-  
+
   private static final String PREFIX = AccumuloInputFormatTest.class.getSimpleName();
   private static final String INSTANCE_NAME = PREFIX + "_mapreduce_instance";
   private static final String TEST_TABLE_1 = PREFIX + "_mapreduce_table_1";
-  
+  private static final String TEST_TABLE_2 = PREFIX + "_mapreduce_table_2";
+
   /**
    * Check that the iterator configuration is getting stored in the Job conf correctly.
    * 
@@ -62,7 +76,7 @@ public class AccumuloInputFormatTest {
   @Test
   public void testSetIterator() throws IOException {
     Job job = new Job();
-    
+
     IteratorSetting is = new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator");
     AccumuloInputFormat.addIterator(job, is);
     Configuration conf = job.getConfiguration();
@@ -71,36 +85,37 @@ public class AccumuloInputFormatTest {
     String iterators = conf.get("AccumuloInputFormat.ScanOpts.Iterators");
     assertEquals(new String(Base64.encodeBase64(baos.toByteArray())), iterators);
   }
-  
+
   @Test
   public void testAddIterator() throws IOException {
     Job job = new Job();
-    
+
+
     AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", WholeRowIterator.class));
     AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
     IteratorSetting iter = new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator");
     iter.addOption("v1", "1");
     iter.addOption("junk", "\0omg:!\\xyzzy");
     AccumuloInputFormat.addIterator(job, iter);
-    
+
     List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
-    
+
     // Check the list size
     assertTrue(list.size() == 3);
-    
+
     // Walk the list and make sure our settings are correct
     IteratorSetting setting = list.get(0);
     assertEquals(1, setting.getPriority());
     assertEquals("org.apache.accumulo.core.iterators.user.WholeRowIterator", setting.getIteratorClass());
     assertEquals("WholeRow", setting.getName());
     assertEquals(0, setting.getOptions().size());
-    
+
     setting = list.get(1);
     assertEquals(2, setting.getPriority());
     assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass());
     assertEquals("Versions", setting.getName());
     assertEquals(0, setting.getOptions().size());
-    
+
     setting = list.get(2);
     assertEquals(3, setting.getPriority());
     assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass());
@@ -109,7 +124,7 @@ public class AccumuloInputFormatTest {
     assertEquals("1", setting.getOptions().get("v1"));
     assertEquals("\0omg:!\\xyzzy", setting.getOptions().get("junk"));
   }
-  
+
   /**
    * Test adding iterator options where the keys and values contain both the FIELD_SEPARATOR
character (':') and ITERATOR_SEPARATOR (',') characters. There
    * should be no exceptions thrown when trying to parse these types of option entries.
@@ -124,12 +139,12 @@ public class AccumuloInputFormatTest {
     someSetting.addOption(key, value);
     Job job = new Job();
     AccumuloInputFormat.addIterator(job, someSetting);
-    
+
     List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
     assertEquals(1, list.size());
     assertEquals(1, list.get(0).getOptions().size());
     assertEquals(list.get(0).getOptions().get(key), value);
-    
+
     someSetting.addOption(key + "2", value);
     someSetting.setPriority(2);
     someSetting.setName("it2");
@@ -142,7 +157,7 @@ public class AccumuloInputFormatTest {
     assertEquals(list.get(1).getOptions().get(key), value);
     assertEquals(list.get(1).getOptions().get(key + "2"), value);
   }
-  
+
   /**
    * Test getting iterator settings for multiple iterators set
    * 
@@ -151,69 +166,71 @@ public class AccumuloInputFormatTest {
   @Test
   public void testGetIteratorSettings() throws IOException {
     Job job = new Job();
-    
+
     AccumuloInputFormat.addIterator(job, new IteratorSetting(1, "WholeRow", "org.apache.accumulo.core.iterators.WholeRowIterator"));
     AccumuloInputFormat.addIterator(job, new IteratorSetting(2, "Versions", "org.apache.accumulo.core.iterators.VersioningIterator"));
     AccumuloInputFormat.addIterator(job, new IteratorSetting(3, "Count", "org.apache.accumulo.core.iterators.CountingIterator"));
-    
+
     List<IteratorSetting> list = AccumuloInputFormat.getIterators(job);
-    
+
     // Check the list size
     assertTrue(list.size() == 3);
-    
+
     // Walk the list and make sure our settings are correct
     IteratorSetting setting = list.get(0);
     assertEquals(1, setting.getPriority());
     assertEquals("org.apache.accumulo.core.iterators.WholeRowIterator", setting.getIteratorClass());
     assertEquals("WholeRow", setting.getName());
-    
+
     setting = list.get(1);
     assertEquals(2, setting.getPriority());
     assertEquals("org.apache.accumulo.core.iterators.VersioningIterator", setting.getIteratorClass());
     assertEquals("Versions", setting.getName());
-    
+
     setting = list.get(2);
     assertEquals(3, setting.getPriority());
     assertEquals("org.apache.accumulo.core.iterators.CountingIterator", setting.getIteratorClass());
     assertEquals("Count", setting.getName());
-    
+
   }
-  
+
   @Test
   public void testSetRegex() throws IOException {
     Job job = new Job();
-    
+
     String regex = ">\"*%<>\'\\";
-    
+
     IteratorSetting is = new IteratorSetting(50, regex, RegExFilter.class);
     RegExFilter.setRegexs(is, regex, null, null, null, false);
     AccumuloInputFormat.addIterator(job, is);
-    
+
     assertTrue(regex.equals(AccumuloInputFormat.getIterators(job).get(0).getName()));
   }
-  
+
   private static AssertionError e1 = null;
   private static AssertionError e2 = null;
-  
+
   private static class MRTester extends Configured implements Tool {
+
     private static class TestMapper extends Mapper<Key,Value,Key,Value> {
       Key key = null;
       int count = 0;
-      
+
       @Override
       protected void map(Key k, Value v, Context context) throws IOException, InterruptedException
{
         try {
+          String tableName = ((InputFormatBase.RangeInputSplit) context.getInputSplit()).getTableName();
           if (key != null)
             assertEquals(key.getRow().toString(), new String(v.get()));
-          assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
-          assertEquals(new String(v.get()), String.format("%09x", count));
+          assertEquals(new Text(String.format("%s_%09x", tableName, count + 1)), k.getRow());
+          assertEquals(String.format("%s_%09x", tableName, count), new String(v.get()));
         } catch (AssertionError e) {
           e1 = e;
         }
         key = new Key(k);
         count++;
       }
-      
+
       @Override
       protected void cleanup(Context context) throws IOException, InterruptedException {
         try {
@@ -223,59 +240,198 @@ public class AccumuloInputFormatTest {
         }
       }
     }
-    
+
     @Override
     public int run(String[] args) throws Exception {
-      
-      if (args.length != 3) {
+
+      if (args.length != 4) {
         throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <user>
<pass> <table>");
       }
-      
+
       String user = args[0];
       String pass = args[1];
-      String table = args[2];
-      
+      String table1 = args[2];
+      String table2 = args[3];
+
       Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
       job.setJarByClass(this.getClass());
-      
+
       job.setInputFormatClass(AccumuloInputFormat.class);
-      
+
       AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass));
-      AccumuloInputFormat.setInputTableName(job, table);
+
+//      AccumuloInputFormat.setInputTableNames(job, Arrays.asList(new String[]{table1, table2}));
+      TableQueryConfig tableConfig1 = new TableQueryConfig(table1);
+      TableQueryConfig tableConfig2 = new TableQueryConfig(table2);
+
+      AccumuloInputFormat.setTableQueryConfigurations(job, tableConfig1, tableConfig2);
       AccumuloInputFormat.setMockInstance(job, INSTANCE_NAME);
-      
+
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);
       job.setMapOutputValueClass(Value.class);
       job.setOutputFormatClass(NullOutputFormat.class);
-      
+
       job.setNumReduceTasks(0);
-      
+
       job.waitForCompletion(true);
-      
+
       return job.isSuccessful() ? 0 : 1;
     }
-    
+
     public static void main(String[] args) throws Exception {
       assertEquals(0, ToolRunner.run(CachedConfiguration.getInstance(), new MRTester(), args));
     }
   }
-  
+
+  /**
+   * Generate incrementing counts and attach table name to the key/value so that order and
multi-table data can be verified.
+   */
   @Test
   public void testMap() throws Exception {
     MockInstance mockInstance = new MockInstance(INSTANCE_NAME);
     Connector c = mockInstance.getConnector("root", new PasswordToken(""));
     c.tableOperations().create(TEST_TABLE_1);
+    c.tableOperations().create(TEST_TABLE_2);
     BatchWriter bw = c.createBatchWriter(TEST_TABLE_1, new BatchWriterConfig());
+    BatchWriter bw2 = c.createBatchWriter(TEST_TABLE_2, new BatchWriterConfig());
     for (int i = 0; i < 100; i++) {
-      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
-      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
-      bw.addMutation(m);
+      Mutation t1m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_1, i + 1)));
+      t1m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_1, i).getBytes()));
+      bw.addMutation(t1m);
+      Mutation t2m = new Mutation(new Text(String.format("%s_%09x", TEST_TABLE_2, i + 1)));
+      t2m.put(new Text(), new Text(), new Value(String.format("%s_%09x", TEST_TABLE_2, i).getBytes()));
+      bw2.addMutation(t2m);
     }
     bw.close();
-    
-    MRTester.main(new String[] {"root", "", TEST_TABLE_1});
+    bw2.close();
+
+    MRTester.main(new String[] {"root", "", TEST_TABLE_1, TEST_TABLE_2});
     assertNull(e1);
     assertNull(e2);
   }
+
+  /**
+   * Asserts that the configuration contains the expected ranges for the tables.
+   */
+  @Test
+  public void testMultitableRangeSerialization() throws Throwable {
+    List<String> tables = Arrays.asList("t1", "t2", "t3");
+    Job job = new Job(new Configuration());
+    job.setInputFormatClass(AccumuloInputFormat.class);
+    job.setMapperClass(MRTester.TestMapper.class);
+    job.setNumReduceTasks(0);
+    AccumuloInputFormat.setConnectorInfo(job, "root", new PasswordToken(new byte[0]));
+    AccumuloInputFormat.setScanAuthorizations(job, new Authorizations());
+    AccumuloInputFormat.setMockInstance(job, "testmapinstance");
+
+    HashMap<String,Collection<Range>> tblRanges = new HashMap<String,Collection<Range>>();
+    for (String tbl : tables) {
+      List<Range> ranges = Arrays.asList(new Range("a", "b"), new Range("c", "d"),
new Range("e", "f"));
+      tblRanges.put(tbl, ranges);
+    }
+
+    Range defaultRange = new Range("0", "1");
+
+    try {
+      AccumuloInputFormat.setRanges(job, tblRanges);
+      fail("Exception should have been thrown.");
+    } catch(IllegalStateException e) {}
+
+    AccumuloInputFormat.setInputTableNames(job, tables);
+
+    // set a default range
+    AccumuloInputFormat.setRanges(job, Collections.singleton(defaultRange));
+    AccumuloInputFormat.setRanges(job, tblRanges);
+    Map<String,List<Range>> configuredRanges = AccumuloInputFormat.getRanges(job);
+
+    for (Map.Entry<String,List<Range>> cfgRange : configuredRanges.entrySet())
{
+      String tbl = cfgRange.getKey();
+      HashSet<Range> originalRanges = new HashSet<Range>(tblRanges.remove(tbl));
+      originalRanges.add(defaultRange);
+      HashSet<Range> retrievedRanges = new HashSet<Range>(cfgRange.getValue());
+      assertEquals(originalRanges.size(), retrievedRanges.size());
+      assertTrue(originalRanges.containsAll(retrievedRanges));
+      assertTrue(retrievedRanges.containsAll(originalRanges));
+    }
+  }
+
+  /**
+   * Asserts that the configuration contains the expected iterators for the tables.
+   */
+  @Test
+  public void testMultitableIteratorSerialization() throws Throwable {
+    HashSet<String> tables = new HashSet<String>(Arrays.asList("t1", "t2"));
+    Job job = new Job(new Configuration());
+    job.setInputFormatClass(AccumuloInputFormat.class);
+    job.setMapperClass(MRTester.TestMapper.class);
+    job.setNumReduceTasks(0);
+    AccumuloInputFormat.setConnectorInfo(job, "root", new PasswordToken(new byte[0]));
+    AccumuloInputFormat.setScanAuthorizations(job, new Authorizations());
+
+    // create + set iterators on configuration and build expected reference set
+    IteratorSetting isetting1 = new IteratorSetting(1, "name1", "class1");
+    IteratorSetting isetting2 = new IteratorSetting(2, "name2", "class2");
+    IteratorSetting isetting3 = new IteratorSetting(2, "name3", "class3");
+
+    try {
+      AccumuloInputFormat.addIterator(job, "t1", isetting1);
+      fail("Exception should have been thrown.");
+    } catch(IllegalStateException e) {}
+
+    AccumuloInputFormat.setInputTableNames(job, tables);
+
+    AccumuloInputFormat.addIterator(job, "t1", isetting1);
+    AccumuloInputFormat.addIterator(job, "t2", isetting2);
+    AccumuloInputFormat.addIterator(job, isetting3);
+
+    // verify per-table iterators
+    List<IteratorSetting> t1iters = AccumuloInputFormat.getIterators(job, "t1");
+    List<IteratorSetting> t2iters = AccumuloInputFormat.getIterators(job, "t2");
+    assertFalse(t1iters.isEmpty());
+    assertEquals(isetting1, t1iters.get(1));
+    assertEquals(isetting3, t1iters.get(0));
+    assertEquals(isetting2, t2iters.get(1));
+    assertEquals(isetting3, t2iters.get(0));
+  }
+
+  @Test
+  public void testMultitableColumnSerialization() throws IOException, AccumuloSecurityException
{
+    HashSet<String> tables = new HashSet<String>(Arrays.asList("t1", "t2"));
+    Job job = new Job(new Configuration());
+    job.setInputFormatClass(AccumuloInputFormat.class);
+    job.setMapperClass(MRTester.TestMapper.class);
+    job.setNumReduceTasks(0);
+    AccumuloInputFormat.setConnectorInfo(job, "root", new PasswordToken(new byte[0]));
+
+    AccumuloInputFormat.setScanAuthorizations(job, new Authorizations());
+
+    Map<String,Collection<Pair<Text,Text>>> columns = new HashMap<String,Collection<Pair<Text,Text>>>();
+    HashSet<Pair<Text,Text>> t1cols = new HashSet<Pair<Text,Text>>();
+    t1cols.add(new Pair(new Text("a"), new Text("b")));
+    HashSet<Pair<Text,Text>> t2cols = new HashSet<Pair<Text,Text>>();
+    t2cols.add(new Pair(new Text("b"), new Text("c")));
+    columns.put("t1", t1cols);
+    columns.put("t2", t2cols);
+
+    Pair<Text,Text> defaultColumn = new Pair(new Text("c"), new Text("d"));
+
+    try {
+      AccumuloInputFormat.fetchColumns(job, columns);
+      fail("Exception should have been thrown.");
+    } catch(IllegalStateException e) {}
+
+    AccumuloInputFormat.setInputTableNames(job, tables);
+
+    AccumuloInputFormat.fetchColumns(job, Collections.singleton(defaultColumn));
+    AccumuloInputFormat.fetchColumns(job, columns);
+
+    columns.get("t1").add(defaultColumn);
+    columns.get("t2").add(defaultColumn);
+
+    Collection<Pair<Text,Text>> t1actual = AccumuloInputFormat.getFetchedColumns(job,
"t1");
+    assertEquals(columns.get("t1"), t1actual);
+    Collection<Pair<Text,Text>> t2actual = AccumuloInputFormat.getFetchedColumns(job,
"t2");
+    assertEquals(columns.get("t2"), t2actual);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/fdf4cadb/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java b/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java
new file mode 100644
index 0000000..1d5d187
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/conf/TableQueryConfigTest.java
@@ -0,0 +1,93 @@
+package org.apache.accumulo.core.conf;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TableQueryConfigTest {
+  
+  private static final String TEST_TABLE = "TEST_TABLE";
+  private TableQueryConfig tableQueryConfig;
+  
+  @Before
+  public void setUp() {
+    tableQueryConfig = new TableQueryConfig(TEST_TABLE);
+  }
+  
+  @Test
+  public void testSerialization_OnlyTable() throws IOException {
+    byte[] serialized = serialize(tableQueryConfig);
+    ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+    TableQueryConfig actualConfig = new TableQueryConfig(new DataInputStream(bais));
+    bais.close();
+    
+    assertEquals(tableQueryConfig, actualConfig);
+  }
+  
+  @Test
+  public void testSerialization_ranges() throws IOException {
+    List<Range> ranges = new ArrayList<Range>();
+    ranges.add(new Range("a", "b"));
+    ranges.add(new Range("c", "d"));
+    tableQueryConfig.setRanges(ranges);
+    
+    byte[] serialized = serialize(tableQueryConfig);
+    ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+    TableQueryConfig actualConfig = new TableQueryConfig(new DataInputStream(bais));
+    bais.close();
+    
+    assertEquals(ranges, actualConfig.getRanges());
+  }
+  
+  @Test
+  public void testSerialization_columns() throws IOException {
+    Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
+    columns.add(new Pair<Text,Text>(new Text("cf1"), new Text("cq1")));
+    columns.add(new Pair<Text,Text>(new Text("cf2"), null));
+    tableQueryConfig.setColumns(columns);
+    
+    byte[] serialized = serialize(tableQueryConfig);
+    ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+    TableQueryConfig actualConfig = new TableQueryConfig(new DataInputStream(bais));
+    bais.close();
+    
+    assertEquals(actualConfig.getColumns(), columns);
+  }
+  
+  @Test
+  public void testSerialization_iterators() throws IOException {
+    List<IteratorSetting> settings = new ArrayList<IteratorSetting>();
+    settings.add(new IteratorSetting(50, "iter", "iterclass"));
+    settings.add(new IteratorSetting(55, "iter2", "iterclass2"));
+    tableQueryConfig.setIterators(settings);
+    byte[] serialized = serialize(tableQueryConfig);
+    ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+    TableQueryConfig actualConfig = new TableQueryConfig(new DataInputStream(bais));
+    bais.close();
+    
+    assertEquals(actualConfig.getIterators(), settings);
+  }
+  
+  private byte[] serialize(TableQueryConfig tableQueryConfig) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    tableQueryConfig.write(new DataOutputStream(baos));
+    baos.close();
+    
+    return baos.toByteArray();
+  }
+}


Mime
View raw message