accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [03/10] git commit: ACCUMULO-1783 Rework AccumuloStorage, diverging a little from AccumuloWholeRowStorage. Not sure yet if this is completely desirable or not.
Date Thu, 31 Oct 2013 03:25:15 GMT
ACCUMULO-1783 Rework AccumuloStorage, diverging a little from
AccumuloWholeRowStorage. Not sure yet if this is completely desirable or
not.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/58e5a7ec
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/58e5a7ec
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/58e5a7ec

Branch: refs/heads/ACCUMULO-1783
Commit: 58e5a7ec00a9198b7f31ac910ee61f1d704aaec7
Parents: 74c01ec
Author: Josh Elser <elserj@apache.org>
Authored: Thu Oct 24 13:33:32 2013 -0700
Committer: Josh Elser <elserj@apache.org>
Committed: Thu Oct 24 13:33:32 2013 -0700

----------------------------------------------------------------------
 .../apache/accumulo/pig/AccumuloStorage.java    | 93 ++++++++++++++++++--
 .../accumulo/pig/AccumuloWholeRowStorage.java   |  2 +-
 .../pig/AbstractAccumuloStorageTest.java        |  2 +-
 .../pig/AccumuloWholeRowStorageTest.java        |  2 +-
 .../java/org/apache/accumulo/pig/TestUtils.java |  4 +-
 5 files changed, 89 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index c72f07f..9c8f002 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -1,23 +1,33 @@
 package org.apache.accumulo.pig;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.SortedMap;
 
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 
 import com.google.common.collect.Lists;
 
@@ -39,8 +49,72 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   
   @Override
   protected Tuple getTuple(Key key, Value value) throws IOException {
-    // TODO Auto-generated method stub
-    return null;
+    
+    SortedMap<Key,Value> rowKVs = WholeRowIterator.decodeRow(key, value);
+    List<Tuple> columns = new ArrayList<Tuple>(rowKVs.size());
+    
+    List<Object> tupleEntries = Lists.newLinkedList();
+    Iterator<Entry<Key,Value>> iter = rowKVs.entrySet().iterator();
+    List<Entry<Key,Value>> aggregate = Lists.newLinkedList();
+    Entry<Key,Value> currentEntry = null;
+    
+    while (iter.hasNext()) {
+      if (null == currentEntry) {
+        currentEntry = iter.next();
+      } else {
+        Entry<Key,Value> nextEntry = iter.next();
+        
+        // If we have the same colfam
+        if (currentEntry.getKey().equals(nextEntry.getKey(), PartialKey.ROW_COLFAM)) {
+          // Aggregate this entry into the map
+          aggregate.add(nextEntry);
+        } else {
+          // Flush and start again
+          InternalMap map = aggregate(aggregate);
+          tupleEntries.add(map);
+          
+          aggregate = Lists.newLinkedList();
+        }
+      }
+    }
+    
+    if (!aggregate.isEmpty()) {
+      tupleEntries.add(aggregate(aggregate));
+    }
+    
+    // and wrap it in a tuple
+    Tuple tuple = TupleFactory.getInstance().newTuple(tupleEntries.size() + 1);
+    tuple.set(0, new DataByteArray(key.getRow().getBytes()));
+    int i = 1;
+    for (Object obj : tupleEntries)  {
+      tuple.set(i, obj);
+      i++;
+    }
+    
+    return tuple;
+  }
+  
+  private InternalMap aggregate(List<Entry<Key,Value>> columns)  {
+    InternalMap map = new InternalMap();
+    for (Entry<Key,Value> column : columns) {
+      map.put(column.getKey().getColumnQualifier().toString(), new DataByteArray(column.getValue().get()));
+    }
+    
+    return map;
+  }
+  
+  private Tuple columnToTuple(Text colfam, Text colqual, Text colvis, long ts, Value val)
throws IOException {
+    Tuple tuple = TupleFactory.getInstance().newTuple(5);
+    tuple.set(0, new DataByteArray(colfam.getBytes()));
+    tuple.set(1, new DataByteArray(colqual.getBytes()));
+    tuple.set(2, new DataByteArray(colvis.getBytes()));
+    tuple.set(3, new Long(ts));
+    tuple.set(4, new DataByteArray(val.get()));
+    return tuple;
+  }
+  
+  protected void configureInputFormat(Configuration conf) {
+    AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
   }
   
   @Override
@@ -57,6 +131,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == fieldSchemas)
? null : fieldSchemas[0]));
     
     // TODO Can these be lifted up to members of the class instead of this method?
+    // Not sure if AccumuloStorage instances need to be thread-safe or not
     final Text _cfHolder = new Text(), _cqHolder = new Text();
     
     int columnOffset = 0;
@@ -94,9 +169,9 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
             
             mutation.put(_cfHolder, _cqHolder, value);
           } else {
-            // Just put the Map's key into the CF
-            _cfHolder.set(entry.getKey());
-            mutation.put(_cfHolder, EMPTY_TEXT, value);
+            // Just put the Map's key into the CQ
+            _cqHolder.set(entry.getKey());
+            mutation.put(EMPTY_TEXT, _cqHolder, value);
           }
         }
       } else if (null == cf) {
@@ -109,13 +184,13 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
         // and then shove the value into the Value
         int index = cf.indexOf(COLON);
         if (-1 == index) {
-          _cfHolder.set(cf);
+          _cqHolder.set(cf);
           
-          mutation.put(_cfHolder, EMPTY_TEXT, value);
+          mutation.put(EMPTY_TEXT, _cqHolder, value);
         } else {
-          byte[] cfBytes = cf.getBytes(); 
+          byte[] cfBytes = cf.getBytes();
           _cfHolder.set(cfBytes, 0, index);
-          _cqHolder.set(cfBytes, index+1, cfBytes.length - (index + 1));
+          _cqHolder.set(cfBytes, index + 1, cfBytes.length - (index + 1));
           
           mutation.put(_cfHolder, _cqHolder, value);
         }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
index fcfd55e..af3ee01 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
@@ -84,7 +84,7 @@ public class AccumuloWholeRowStorage extends AbstractAccumuloStorage {
   }
   
   protected void configureInputFormat(Configuration conf) {
-    AccumuloInputFormat.addIterator(conf, new IteratorSetting(10, WholeRowIterator.class));
+    AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 9b9c3c7..1b5b81a 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -98,7 +98,7 @@ public class AbstractAccumuloStorageTest {
   }
   
   public String getDefaultLoadLocation() {
-    return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2,col3&start=abc&end=z";
+    return "accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&fetch_columns=col1:cq1,col2:cq2,col3&start=abc&end=z";
   }
   
   public String getDefaultStoreLocation() {

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
index f8e8fe1..690d86c 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -58,7 +58,7 @@ public class AccumuloWholeRowStorageTest {
     
     Job expected = test.getDefaultExpectedLoadJob();
     Configuration expectedConf = expected.getConfiguration();
-    AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(10, WholeRowIterator.class));
+    AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(50, WholeRowIterator.class));
     
     TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
   }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/58e5a7ec/src/test/java/org/apache/accumulo/pig/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/TestUtils.java b/src/test/java/org/apache/accumulo/pig/TestUtils.java
index 6c8bebf..307a871 100644
--- a/src/test/java/org/apache/accumulo/pig/TestUtils.java
+++ b/src/test/java/org/apache/accumulo/pig/TestUtils.java
@@ -39,14 +39,14 @@ public class TestUtils {
     Iterator<Entry<String,String>> expectedIter = expectedConf.iterator();
     while (expectedIter.hasNext()) {
       Entry<String,String> e = expectedIter.next();
-      assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
+      assertEquals("Values differed for " + e.getKey(), expectedConf.get(e.getKey()), actualConf.get(e.getKey()));
     }
     
     // Basically, for all the keys in actualConf, make sure the values in both confs are
equal
     Iterator<Entry<String,String>> actualIter = actualConf.iterator();
     while (actualIter.hasNext()) {
       Entry<String,String> e = actualIter.next();
-      assertEquals(actualConf.get(e.getKey()), expectedConf.get(e.getKey()));
+      assertEquals("Values differed for " + e.getKey(), expectedConf.get(e.getKey()), actualConf.get(e.getKey()));
     }
   }
   


Mime
View raw message