accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [08/10] git commit: ACCUMULO-1783 Update the Load impl for the store changes
Date Thu, 31 Oct 2013 03:25:20 GMT
ACCUMULO-1783 Update the Load impl for the store changes

For now, add a bool option on the constructor to say whether or not
the same column family should in a row should be aggregated together
into one Map. Removes implicit rules I had on the cf at storage


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/e0d3ade8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/e0d3ade8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/e0d3ade8

Branch: refs/heads/ACCUMULO-1783
Commit: e0d3ade8ef858f4299bb93f81fb4e52cf18b7de2
Parents: 291b939
Author: Josh Elser <elserj@apache.org>
Authored: Wed Oct 30 22:17:39 2013 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Wed Oct 30 22:17:39 2013 -0400

----------------------------------------------------------------------
 .../apache/accumulo/pig/AccumuloStorage.java    | 15 ++++++-
 .../accumulo/pig/AccumuloStorageTest.java       | 45 +++++++++++++++++++-
 2 files changed, 56 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e0d3ade8/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 97fb44f..cccba64 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -39,6 +39,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   public static final String METADATA_SUFFIX = "_metadata";
   
   protected final List<String> columnSpecs;
+  protected final boolean aggregateColfams;
   
   // Not sure if AccumuloStorage instances need to be thread-safe or not
   final Text _cfHolder = new Text(), _cqHolder = new Text();
@@ -47,8 +48,17 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     this("");
   }
   
+  public AccumuloStorage(boolean aggregateColfams) {
+    this("", aggregateColfams);
+  }
+  
   public AccumuloStorage(String columns) {
+    this(columns, false);
+  }
+  
+  public AccumuloStorage(String columns, boolean aggregateColfams) {
     this.caster = new Utf8StorageConverter();
+    this.aggregateColfams = aggregateColfams;
     
     // TODO It would be nice to have some other means than enumerating
     // the CF for every column in the Tuples we're going process
@@ -77,8 +87,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
       } else {
         Entry<Key,Value> nextEntry = iter.next();
         
-        // If we have the same colfam
-        if (currentEntry.getKey().equals(nextEntry.getKey(), PartialKey.ROW_COLFAM)) {
+        // If we're not aggregating colfams together, or we are and we have the same colfam
+        if (!aggregateColfams || currentEntry.getKey().equals(nextEntry.getKey(), PartialKey.ROW_COLFAM))
{
           // Aggregate this entry into the map
           aggregate.add(nextEntry);
         } else {
@@ -208,6 +218,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
    */
   protected void addColumn(Mutation mutation, String columnDef, String columnName, Value
columnValue) {
     if (null == columnDef && null == columnName) {
+      // TODO Emit a counter here somehow?
       log.warn("Was provided no name or definition for column. Ignoring value");
       return;
     }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/e0d3ade8/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
index db80c47..a02ad7c 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -358,8 +358,8 @@ public class AccumuloStorageTest {
   }
   
   @Test
-  public void testMultipleColumns() throws IOException {
-    AccumuloStorage storage = new AccumuloStorage();
+  public void testMultipleColumnsAggregateColfams() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage(true);
     
     List<Key> keys = Lists.newArrayList();
     List<Value> values = Lists.newArrayList();
@@ -406,4 +406,45 @@ public class AccumuloStorageTest {
     Assert.assertEquals(map, t.get(3));
   }
   
+  @Test
+  public void testMultipleColumnsNoColfamAggregate() throws IOException {
+    AccumuloStorage storage = new AccumuloStorage(false);
+    
+    List<Key> keys = Lists.newArrayList();
+    List<Value> values = Lists.newArrayList();
+    
+    keys.add(new Key("1", "col1", "cq1"));
+    keys.add(new Key("1", "col1", "cq2"));
+    keys.add(new Key("1", "col1", "cq3"));
+    keys.add(new Key("1", "col2", "cq1"));
+    keys.add(new Key("1", "col3", "cq1"));
+    keys.add(new Key("1", "col3", "cq2"));
+    
+    values.add(new Value("value1".getBytes()));
+    values.add(new Value("value2".getBytes()));
+    values.add(new Value("value3".getBytes()));
+    values.add(new Value("value1".getBytes()));
+    values.add(new Value("value1".getBytes()));
+    values.add(new Value("value2".getBytes()));
+    
+    Key k = new Key("1");
+    Value v = WholeRowIterator.encodeRow(keys, values);
+    
+    Tuple t = storage.getTuple(k, v);
+    
+    Assert.assertEquals(2, t.size());
+    
+    Assert.assertEquals("1", t.get(0).toString());
+    
+    InternalMap map = new InternalMap();
+    map.put("col1:cq1", new DataByteArray("value1"));
+    map.put("col1:cq2", new DataByteArray("value2"));
+    map.put("col1:cq3", new DataByteArray("value3"));
+    map.put("col2:cq1", new DataByteArray("value1"));
+    map.put("col3:cq1", new DataByteArray("value1"));
+    map.put("col3:cq2", new DataByteArray("value2"));
+    
+    Assert.assertEquals(map, t.get(1));
+  }
+  
 }


Mime
View raw message