accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [02/10] git commit: ACCUMULO-1783 Add in the implementation for getMutations for AccumuloStorage.
Date Thu, 31 Oct 2013 03:25:14 GMT
ACCUMULO-1783 Add in the implementation for getMutations for
AccumuloStorage.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/74c01ec2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/74c01ec2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/74c01ec2

Branch: refs/heads/ACCUMULO-1783
Commit: 74c01ec215ee7c50fd75c16d606fc8c073f25c8f
Parents: 294f9ce
Author: Josh Elser <elserj@apache.org>
Authored: Thu Oct 24 12:19:29 2013 -0700
Committer: Josh Elser <elserj@apache.org>
Committed: Thu Oct 24 12:19:29 2013 -0700

----------------------------------------------------------------------
 pom.xml                                         |   5 +
 .../accumulo/pig/AbstractAccumuloStorage.java   | 186 +++++++++++++++++--
 .../apache/accumulo/pig/AccumuloKVStorage.java  | 150 ---------------
 .../apache/accumulo/pig/AccumuloStorage.java    | 134 +++++++++++++
 4 files changed, 312 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 249dcce..630d5e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,11 @@
       <artifactId>joda-time</artifactId>
       <version>1.6</version>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>15.0</version>
+    </dependency>
   </dependencies>
   
 </project>

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 0424b8a..494fd72 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -17,10 +17,13 @@
 package org.apache.accumulo.pig;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
@@ -31,6 +34,7 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,12 +46,19 @@ import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadStoreCaster;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
+import org.joda.time.DateTime;
 
 /**
  * A LoadStoreFunc for retrieving data from and storing data to Accumulo
@@ -61,6 +72,8 @@ import org.apache.pig.impl.util.UDFContext;
 public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface
{
   private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class);
   
+  private static final String COLON = ":", COMMA = ",";
+  
   private Configuration conf;
   private RecordReader<Key,Value> reader;
   private RecordWriter<Text,Mutation> writer;
@@ -81,7 +94,9 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
   int maxWriteThreads = 10;
   long maxMutationBufferSize = 10 * 1000 * 1000;
   int maxLatency = 10 * 1000;
-  
+
+  protected LoadStoreCaster caster;
+  protected ResourceSchema schema;
   protected String contextSignature = null;
   
   public AbstractAccumuloStorage() {}
@@ -118,7 +133,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
   
   private void setLocationFromUri(String location) throws IOException {
     // ex:
-    // accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&start=abc&end=z
+    // accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&fetch_columns=col1:cq1,col2:cq2&start=abc&end=z
     String columns = "";
     try {
       if (!location.startsWith("accumulo://"))
@@ -137,7 +152,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
             zookeepers = pair[1];
           else if (pair[0].equals("auths"))
             auths = pair[1];
-          else if (pair[0].equals("columns"))
+          else if (pair[0].equals("fetch_columns"))
             columns = pair[1];
           else if (pair[0].equals("start"))
             start = pair[1];
@@ -158,13 +173,13 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
       if (auths == null || auths.equals("")) {
         authorizations = new Authorizations();
       } else {
-        authorizations = new Authorizations(auths.split(","));
+        authorizations = new Authorizations(auths.split(COMMA));
       }
       
-      if (!columns.equals("")) {
-        for (String cfCq : columns.split(",")) {
-          if (cfCq.contains("|")) {
-            String[] c = cfCq.split("\\|");
+      if (!StringUtils.isEmpty(columns)) {
+        for (String cfCq : columns.split(COMMA)) {
+          if (cfCq.contains(COLON)) {
+            String[] c = cfCq.split(COLON);
             columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text(c[0]),
new Text(c[1])));
           } else {
             columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text(cfCq),
null));
@@ -175,7 +190,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
     } catch (Exception e) {
       throw new IOException(
           "Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&"
-              + "[start=startRow,end=endRow,columns=[cf1|cq1,cf2|cq2,...],write_buffer_size_bytes=10000000,write_threads=10,write_latency_ms=30000]]':
"
+              + "[start=startRow,end=endRow,fetch_columns=[cf1:cq1,cf2:cq2,...],write_buffer_size_bytes=10000000,write_threads=10,write_latency_ms=30000]]':
"
               + e.getMessage());
     }
   }
@@ -255,10 +270,6 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
     return new AccumuloOutputFormat();
   }
   
-  public void checkSchema(ResourceSchema schema) throws IOException {
-    // we don't care about types, they all get casted to ByteBuffers
-  }
-  
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void prepareToWrite(RecordWriter writer) {
     this.writer = writer;
@@ -280,4 +291,153 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
   public void cleanupOnFailure(String failure, Job job) {}
 
   public void cleanupOnSuccess(String location, Job job) {}
+  
+  @Override
+  public void checkSchema(ResourceSchema s) throws IOException {
+    if (!(caster instanceof LoadStoreCaster)) {
+      LOG.error("Caster must implement LoadStoreCaster for writing to Accumulo.");
+      throw new IOException("Bad Caster " + caster.getClass());
+    }
+    schema = s;
+    getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
+  }
+
+  
+  protected Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws
IOException {
+    Object o = tuple.get(i);
+    byte type = schemaToType(o, i, fieldSchemas);
+    
+    return objToText(o, type);
+  }
+  
+  protected Text objectToText(Object o, ResourceFieldSchema fieldSchema) throws IOException
{
+    byte type = schemaToType(o, fieldSchema);
+    
+    return objToText(o, type);
+  }
+  
+  protected byte schemaToType(Object o, ResourceFieldSchema fieldSchema) {
+    return (fieldSchema == null) ? DataType.findType(o) : fieldSchema.getType();
+  }
+  
+  protected byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
+    return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+  }
+  
+  protected byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws
IOException {
+    Object o = tuple.get(i);
+    byte type = schemaToType(o, i, fieldSchemas);
+    
+    return objToBytes(o, type);
+    
+  }
+  
+  protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws
IOException { 
+    Object o = tuple.get(i);
+    byte type = schemaToType(o, i, fieldSchemas);
+    
+    switch (type) {
+      case DataType.LONG:
+        return (Long) o;
+      case DataType.CHARARRAY:
+        String timestampString = (String) o;
+        try {
+          return Long.parseLong(timestampString);
+        } catch (NumberFormatException e) {
+          final String msg = "Could not cast chararray into long: " + timestampString;
+          LOG.error(msg);
+          throw new IOException(msg, e);
+        }
+      case DataType.DOUBLE:
+        Double doubleTimestamp = (Double) o;
+        return doubleTimestamp.longValue();
+      case DataType.FLOAT:
+        Float floatTimestamp = (Float) o;
+        return floatTimestamp.longValue();
+      case DataType.INTEGER:
+        Integer intTimestamp = (Integer) o;
+        return intTimestamp.longValue();
+      case DataType.BIGINTEGER:
+        BigInteger bigintTimestamp = (BigInteger) o;
+        long longTimestamp = bigintTimestamp.longValue();
+        
+        BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
+        
+        if (!recreatedTimestamp.equals(bigintTimestamp)) {
+          LOG.warn("Downcasting BigInteger into Long results in a change of the original
value. Was " + bigintTimestamp + " but is now " + longTimestamp);
+        }
+        
+        return longTimestamp;
+      case DataType.BIGDECIMAL:
+        BigDecimal bigdecimalTimestamp = (BigDecimal) o;
+        try {
+          return bigdecimalTimestamp.longValueExact();
+        } catch (ArithmeticException e) {
+          long convertedLong = bigdecimalTimestamp.longValue();
+          LOG.warn("Downcasting BigDecimal into Long results in a loss of information. Was
" + bigdecimalTimestamp + " but is now " + convertedLong);
+          return convertedLong;
+        }
+      case DataType.BYTEARRAY:
+        DataByteArray bytes = (DataByteArray) o;
+        try {
+          return Long.parseLong(bytes.toString());
+        } catch (NumberFormatException e) {
+          final String msg = "Could not cast bytes into long: " + bytes.toString();
+          LOG.error(msg);
+          throw new IOException(msg, e);
+        }
+      default:
+        LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long.");
+        throw new IOException("Could not convert " + o.getClass() + " into long");
+        
+    }
+  }
+  
+  protected Text objToText(Object o, byte type) throws IOException {
+    return new Text(objToBytes(o, type));
+  }
+  
+  @SuppressWarnings("unchecked")
+  protected byte[] objToBytes(Object o, byte type) throws IOException {
+    if (o == null)
+      return null;
+    switch (type) {
+      case DataType.BYTEARRAY:
+        return ((DataByteArray) o).get();
+      case DataType.BAG:
+        return caster.toBytes((DataBag) o);
+      case DataType.CHARARRAY:
+        return caster.toBytes((String) o);
+      case DataType.DOUBLE:
+        return caster.toBytes((Double) o);
+      case DataType.FLOAT:
+        return caster.toBytes((Float) o);
+      case DataType.INTEGER:
+        return caster.toBytes((Integer) o);
+      case DataType.LONG:
+        return caster.toBytes((Long) o);
+      case DataType.BIGINTEGER:
+        return caster.toBytes((BigInteger) o);
+      case DataType.BIGDECIMAL:
+        return caster.toBytes((BigDecimal) o);
+      case DataType.BOOLEAN:
+        return caster.toBytes((Boolean) o);
+      case DataType.DATETIME:
+        return caster.toBytes((DateTime) o);
+        
+        // The type conversion here is unchecked.
+        // Relying on DataType.findType to do the right thing.
+      case DataType.MAP:
+        return caster.toBytes((Map<String,Object>) o);
+        
+      case DataType.NULL:
+        return null;
+      case DataType.TUPLE:
+        return caster.toBytes((Tuple) o);
+      case DataType.ERROR:
+        throw new IOException("Unable to determine type of " + o.getClass());
+      default:
+        throw new IOException("Unable to find a converter for tuple field " + o);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
index 8a17e8b..8462985 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
@@ -17,11 +17,8 @@
 package org.apache.accumulo.pig;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Map;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -30,18 +27,12 @@ import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
-import org.apache.pig.LoadStoreCaster;
-import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.Utf8StorageConverter;
-import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.joda.time.DateTime;
 
 /**
  * A LoadStoreFunc for retrieving data from and storing data to Accumulo.
@@ -60,9 +51,6 @@ import org.joda.time.DateTime;
  */
 public class AccumuloKVStorage extends AbstractAccumuloStorage {
   private static final Log LOG = LogFactory.getLog(AccumuloKVStorage.class);
-  protected LoadStoreCaster caster;
-  
-  private ResourceSchema schema;
   
   public AccumuloKVStorage() {
     this.caster = new Utf8StorageConverter();
@@ -129,142 +117,4 @@ public class AccumuloKVStorage extends AbstractAccumuloStorage {
     return Collections.singleton(mut);
   }
   
-  @Override
-  public void checkSchema(ResourceSchema s) throws IOException {
-    if (!(caster instanceof LoadStoreCaster)) {
-      LOG.error("Caster must implement LoadStoreCaster for writing to Accumulo.");
-      throw new IOException("Bad Caster " + caster.getClass());
-    }
-    schema = s;
-    getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
-  }
-  
-  private Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws
IOException {
-    Object o = tuple.get(i);
-    byte type = schemaToType(o, i, fieldSchemas);
-    
-    return objToText(o, type);
-  }
-  
-  private byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) {
-    return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
-  }
-  
-  private byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws
IOException {
-    Object o = tuple.get(i);
-    byte type = schemaToType(o, i, fieldSchemas);
-    
-    return objToBytes(o, type);
-    
-  }
-  
-  private long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException
{ 
-    Object o = tuple.get(i);
-    byte type = schemaToType(o, i, fieldSchemas);
-    
-    switch (type) {
-      case DataType.LONG:
-        return (Long) o;
-      case DataType.CHARARRAY:
-        String timestampString = (String) o;
-        try {
-          return Long.parseLong(timestampString);
-        } catch (NumberFormatException e) {
-          final String msg = "Could not cast chararray into long: " + timestampString;
-          LOG.error(msg);
-          throw new IOException(msg, e);
-        }
-      case DataType.DOUBLE:
-        Double doubleTimestamp = (Double) o;
-        return doubleTimestamp.longValue();
-      case DataType.FLOAT:
-        Float floatTimestamp = (Float) o;
-        return floatTimestamp.longValue();
-      case DataType.INTEGER:
-        Integer intTimestamp = (Integer) o;
-        return intTimestamp.longValue();
-      case DataType.BIGINTEGER:
-        BigInteger bigintTimestamp = (BigInteger) o;
-        long longTimestamp = bigintTimestamp.longValue();
-        
-        BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp);
-        
-        if (!recreatedTimestamp.equals(bigintTimestamp)) {
-          LOG.warn("Downcasting BigInteger into Long results in a change of the original
value. Was " + bigintTimestamp + " but is now " + longTimestamp);
-        }
-        
-        return longTimestamp;
-      case DataType.BIGDECIMAL:
-        BigDecimal bigdecimalTimestamp = (BigDecimal) o;
-        try {
-          return bigdecimalTimestamp.longValueExact();
-        } catch (ArithmeticException e) {
-          long convertedLong = bigdecimalTimestamp.longValue();
-          LOG.warn("Downcasting BigDecimal into Long results in a loss of information. Was
" + bigdecimalTimestamp + " but is now " + convertedLong);
-          return convertedLong;
-        }
-      case DataType.BYTEARRAY:
-        DataByteArray bytes = (DataByteArray) o;
-        try {
-          return Long.parseLong(bytes.toString());
-        } catch (NumberFormatException e) {
-          final String msg = "Could not cast bytes into long: " + bytes.toString();
-          LOG.error(msg);
-          throw new IOException(msg, e);
-        }
-      default:
-        LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long.");
-        throw new IOException("Could not convert " + o.getClass() + " into long");
-        
-    }
-  }
-  
-  private Text objToText(Object o, byte type) throws IOException {
-    return new Text(objToBytes(o, type));
-  }
-  
-  @SuppressWarnings("unchecked")
-  private byte[] objToBytes(Object o, byte type) throws IOException {
-    if (o == null)
-      return null;
-    switch (type) {
-      case DataType.BYTEARRAY:
-        return ((DataByteArray) o).get();
-      case DataType.BAG:
-        return caster.toBytes((DataBag) o);
-      case DataType.CHARARRAY:
-        return caster.toBytes((String) o);
-      case DataType.DOUBLE:
-        return caster.toBytes((Double) o);
-      case DataType.FLOAT:
-        return caster.toBytes((Float) o);
-      case DataType.INTEGER:
-        return caster.toBytes((Integer) o);
-      case DataType.LONG:
-        return caster.toBytes((Long) o);
-      case DataType.BIGINTEGER:
-        return caster.toBytes((BigInteger) o);
-      case DataType.BIGDECIMAL:
-        return caster.toBytes((BigDecimal) o);
-      case DataType.BOOLEAN:
-        return caster.toBytes((Boolean) o);
-      case DataType.DATETIME:
-        return caster.toBytes((DateTime) o);
-        
-        // The type conversion here is unchecked.
-        // Relying on DataType.findType to do the right thing.
-      case DataType.MAP:
-        return caster.toBytes((Map<String,Object>) o);
-        
-      case DataType.NULL:
-        return null;
-      case DataType.TUPLE:
-        return caster.toBytes((Tuple) o);
-      case DataType.ERROR:
-        throw new IOException("Unable to determine type of " + o.getClass());
-      default:
-        throw new IOException("Unable to find a converter for tuple field " + o);
-    }
-  }
-  
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/74c01ec2/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
new file mode 100644
index 0000000..c72f07f
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -0,0 +1,134 @@
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+import com.google.common.collect.Lists;
+
+public class AccumuloStorage extends AbstractAccumuloStorage {
+  private static final Logger log = Logger.getLogger(AccumuloStorage.class);
+  private static final String COMMA = ",", COLON = ":";
+  private static final Text EMPTY_TEXT = new Text(new byte[0]);
+  
+  protected final List<String> columnSpecs;
+  
+  public AccumuloStorage(String columns) {
+    if (!StringUtils.isBlank(columns)) {
+      String[] columnArray = StringUtils.split(columns, COMMA);
+      columnSpecs = Lists.newArrayList(columnArray);
+    } else {
+      columnSpecs = Collections.emptyList();
+    }
+  }
+  
+  @Override
+  protected Tuple getTuple(Key key, Value value) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+  
+  @Override
+  public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException
{
+    final ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();
+    
+    Iterator<Object> tupleIter = tuple.iterator();
+    
+    if (1 <= tuple.size()) {
+      log.debug("Ignoring tuple of size " + tuple.size());
+      return Collections.emptyList();
+    }
+    
+    Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == fieldSchemas)
? null : fieldSchemas[0]));
+    
+    // TODO Can these be lifted up to members of the class instead of this method?
+    final Text _cfHolder = new Text(), _cqHolder = new Text();
+    
+    int columnOffset = 0;
+    int tupleOffset = 1;
+    while (tupleIter.hasNext()) {
+      Object o = tupleIter.next();
+      String cf = null;
+      
+      // Figure out if the user provided a specific columnfamily to use.
+      if (columnOffset < columnSpecs.size()) {
+        cf = columnSpecs.get(columnOffset);
+      }
+      
+      // Grab the type for this field
+      byte type = schemaToType(o, fieldSchemas[tupleOffset]);
+      
+      // If we have a Map, we want to treat every Entry as a column in this record
+      // placing said column in the column family unless this instance of AccumuloStorage
+      // was provided a specific columnFamily to use, in which case the entry's column is
+      // in the column qualifier.
+      if (DataType.MAP == type) {
+        @SuppressWarnings("unchecked")
+        Map<String,Object> map = (Map<String,Object>) o;
+        
+        for (Entry<String,Object> entry : map.entrySet()) {
+          Object entryObject = entry.getValue();
+          byte entryType = DataType.findType(entryObject);
+          
+          Value value = new Value(objToBytes(entryObject, entryType));
+          
+          // If we have a CF, use it and push the Map's key down to the CQ
+          if (null != cf) {
+            _cfHolder.set(cf);
+            _cqHolder.set(entry.getKey());
+            
+            mutation.put(_cfHolder, _cqHolder, value);
+          } else {
+            // Just put the Map's key into the CF
+            _cfHolder.set(entry.getKey());
+            mutation.put(_cfHolder, EMPTY_TEXT, value);
+          }
+        }
+      } else if (null == cf) {
+        // We don't know what column to place the value into
+        log.warn("Was provided no column family for non-Map entry in the tuple at offset
" + tupleOffset);
+      } else {
+        Value value = new Value(objToBytes(o, type));
+        
+        // We have something that isn't a Map, use the provided CF as a column name
+        // and then shove the value into the Value
+        int index = cf.indexOf(COLON);
+        if (-1 == index) {
+          _cfHolder.set(cf);
+          
+          mutation.put(_cfHolder, EMPTY_TEXT, value);
+        } else {
+          byte[] cfBytes = cf.getBytes(); 
+          _cfHolder.set(cfBytes, 0, index);
+          _cqHolder.set(cfBytes, index+1, cfBytes.length - (index + 1));
+          
+          mutation.put(_cfHolder, _cqHolder, value);
+        }
+      }
+      
+      columnOffset++;
+      tupleOffset++;
+    }
+    
+    if (0 == mutation.size()) {
+      return Collections.emptyList();
+    }
+    
+    return Collections.singletonList(mutation);
+  }
+}


Mime
View raw message