gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject [1/4] gora git commit: Switched Accumulo Dependency to 1.7.1 and ported AccumuloStore Class to work with accumulo 1.7.1
Date Wed, 22 Feb 2017 16:19:21 GMT
Repository: gora
Updated Branches:
  refs/heads/master 560704c3a -> 4bbf52ee7


Switched Accumulo Dependency to 1.7.1 and ported AccumuloStore Class to
    work with accumulo 1.7.1

Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/ce945da3
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/ce945da3
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/ce945da3

Branch: refs/heads/master
Commit: ce945da33c42463567c40a2d35fc34a0e23618d6
Parents: 560704c
Author: Vaibhav Thapliyal <vaibhav.thapliyal.91@gmail.com>
Authored: Fri Feb 17 11:39:15 2017 +0530
Committer: vaibhavthapliyal <vaibhav.thapliyal.91@gmail.com>
Committed: Fri Feb 17 11:39:15 2017 +0530

----------------------------------------------------------------------
 gora-accumulo/pom.xml                           |    2 +-
 .../gora/accumulo/store/AccumuloStore.java      | 1886 +++++++++---------
 2 files changed, 956 insertions(+), 932 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/ce945da3/gora-accumulo/pom.xml
----------------------------------------------------------------------
diff --git a/gora-accumulo/pom.xml b/gora-accumulo/pom.xml
index bc131fa..e13f7a7 100644
--- a/gora-accumulo/pom.xml
+++ b/gora-accumulo/pom.xml
@@ -50,7 +50,7 @@
   </ciManagement>
 
   <properties>
-    <accumulo.version>1.6.4</accumulo.version>
+    <accumulo.version>1.7.1</accumulo.version>
     <osgi.import>*</osgi.import>
     <osgi.export>org.apache.gora.accumulo*;version="${project.version}";-noimport:=true</osgi.export>
   </properties>

http://git-wip-us.apache.org/repos/asf/gora/blob/ce945da3/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
----------------------------------------------------------------------
diff --git a/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java b/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
index a68cdaa..a4cddce 100644
--- a/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
+++ b/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
@@ -50,25 +50,27 @@ import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.mock.MockConnector;
 import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.mock.MockTabletLocator;
+import org.apache.accumulo.core.client.mock.impl.MockTabletLocator;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.iterators.SortedKeyIterator;
 import org.apache.accumulo.core.iterators.user.TimestampFilter;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.client.impl.Credentials;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.avro.Schema;
@@ -108,933 +110,955 @@ import org.w3c.dom.NodeList;
 /**
  * Implementation of a Accumulo data store to be used by gora.
  *
- * @param <K> class to be used for the key
- * @param <T> class to be persisted within the store
+ * @param <K>
+ *            class to be used for the key
+ * @param <T>
+ *            class to be persisted within the store
  */
-public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T> {
-
-  protected static final String MOCK_PROPERTY = "accumulo.mock";
-  protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance";
-  protected static final String ZOOKEEPERS_NAME_PROPERTY = "accumulo.zookeepers";
-  protected static final String USERNAME_PROPERTY = "accumulo.user";
-  protected static final String PASSWORD_PROPERTY = "accumulo.password";
-  protected static final String DEFAULT_MAPPING_FILE = "gora-accumulo-mapping.xml";
-
-  private final static String UNKOWN = "Unknown type ";
-
-  private Connector conn;
-  private BatchWriter batchWriter;
-  private AccumuloMapping mapping;
-  private Credentials credentials;
-  private Encoder encoder;
-
-  public static final Logger LOG = LoggerFactory.getLogger(AccumuloStore.class);
-
-  public Object fromBytes(Schema schema, byte[] data) throws IOException {
-    Schema fromSchema = null;
-    if (schema.getType() == Type.UNION) {
-      try {
-        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
-        int unionIndex = decoder.readIndex();
-        List<Schema> possibleTypes = schema.getTypes();
-        fromSchema = possibleTypes.get(unionIndex);
-        Schema effectiveSchema = possibleTypes.get(unionIndex);
-        if (effectiveSchema.getType() == Type.NULL) {
-          decoder.readNull();
-          return null;
-        } else {
-          data = decoder.readBytes(null).array();
-        }
-      } catch (IOException e) {
-        LOG.error(e.getMessage());
-        throw new GoraException("Error decoding union type: ", e);
-      }
-    } else {
-      fromSchema = schema;
-    }
-    return fromBytes(encoder, fromSchema, data);
-  }
-
-  public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) throws IOException {
-    switch (schema.getType()) {
-    case BOOLEAN:
-      return encoder.decodeBoolean(data);
-    case DOUBLE:
-      return encoder.decodeDouble(data);
-    case FLOAT:
-      return encoder.decodeFloat(data);
-    case INT:
-      return encoder.decodeInt(data);
-    case LONG:
-      return encoder.decodeLong(data);
-    case STRING:
-      return new Utf8(data);
-    case BYTES:
-      return ByteBuffer.wrap(data);
-    case ENUM:
-      return AvroUtils.getEnumValue(schema, encoder.decodeInt(data));
-    case ARRAY:
-      break;
-    case FIXED:
-      break;
-    case MAP:
-      break;
-    case NULL:
-      break;
-    case RECORD:
-      break;
-    case UNION:
-      break;
-    default:
-      break;
-    }
-    throw new IllegalArgumentException(UNKOWN + schema.getType());
-
-  }
-
-  private static byte[] getBytes(Text text) {
-    byte[] bytes = text.getBytes();
-    if (bytes.length != text.getLength()) {
-      bytes = new byte[text.getLength()];
-      System.arraycopy(text.getBytes(), 0, bytes, 0, bytes.length);
-    }
-    return bytes;
-  }
-
-  public K fromBytes(Class<K> clazz, byte[] val) {
-    return fromBytes(encoder, clazz, val);
-  }
-
-  @SuppressWarnings("unchecked")
-  public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] val) {
-    try {
-      if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
-        return (K) Byte.valueOf(encoder.decodeByte(val));
-      } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
-        return (K) Boolean.valueOf(encoder.decodeBoolean(val));
-      } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
-        return (K) Short.valueOf(encoder.decodeShort(val));
-      } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
-        return (K) Integer.valueOf(encoder.decodeInt(val));
-      } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
-        return (K) Long.valueOf(encoder.decodeLong(val));
-      } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
-        return (K) Float.valueOf(encoder.decodeFloat(val));
-      } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
-        return (K) Double.valueOf(encoder.decodeDouble(val));
-      } else if (clazz.equals(String.class)) {
-        return (K) new String(val, "UTF-8");
-      } else if (clazz.equals(Utf8.class)) {
-        return (K) new Utf8(val);
-      }
-
-      throw new IllegalArgumentException(UNKOWN + clazz.getName());
-    } catch (IOException ioe) {
-      LOG.error(ioe.getMessage());
-      throw new RuntimeException(ioe);
-    }
-  }
-
-  private static byte[] copyIfNeeded(byte b[], int offset, int len) {
-    if (len != b.length || offset != 0) {
-      byte[] copy = new byte[len];
-      System.arraycopy(b, offset, copy, 0, copy.length);
-      b = copy;
-    }
-    return b;
-  }
-
-  public byte[] toBytes(Schema toSchema, Object o) {
-    if (toSchema != null && toSchema.getType() == Type.UNION) {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      org.apache.avro.io.BinaryEncoder avroEncoder = EncoderFactory.get().binaryEncoder(baos, null);
-      int unionIndex = 0;
-      try {
-        if (o == null) {
-          unionIndex = firstNullSchemaTypeIndex(toSchema);
-          avroEncoder.writeIndex(unionIndex);
-          avroEncoder.writeNull();
-        } else {
-          unionIndex = firstNotNullSchemaTypeIndex(toSchema);
-          avroEncoder.writeIndex(unionIndex);
-          avroEncoder.writeBytes(toBytes(o));
-        }
-        avroEncoder.flush();
-        return baos.toByteArray();
-      } catch (IOException e) {
-        LOG.error(e.getMessage());
-        return toBytes(o);
-      }
-    } else {
-      return toBytes(o);
-    }
-  }
-
-  private int firstNullSchemaTypeIndex(Schema toSchema) {
-    List<Schema> possibleTypes = toSchema.getTypes();
-    int unionIndex = 0;
-    for (int i = 0; i < possibleTypes.size(); i++ ) {
-      Type pType = possibleTypes.get(i).getType();
-      if (pType == Type.NULL) { // FIXME HUGE kludge to pass tests
-        unionIndex = i; break;
-      }
-    }
-    return unionIndex;
-  }
-
-  private int firstNotNullSchemaTypeIndex(Schema toSchema) {
-    List<Schema> possibleTypes = toSchema.getTypes();
-    int unionIndex = 0;
-    for (int i = 0; i < possibleTypes.size(); i++ ) {
-      Type pType = possibleTypes.get(i).getType();
-      if (pType != Type.NULL) { // FIXME HUGE kludge to pass tests
-        unionIndex = i; break;
-      }
-    }
-    return unionIndex;
-  }
-
-  public byte[] toBytes(Object o) {
-    return toBytes(encoder, o);
-  }
-
-  public static byte[] toBytes(Encoder encoder, Object o) {
-
-    try {
-      if (o instanceof String) {
-        return ((String) o).getBytes("UTF-8");
-      } else if (o instanceof Utf8) {
-        return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) o).getByteLength());
-      } else if (o instanceof ByteBuffer) {
-        return copyIfNeeded(((ByteBuffer) o).array(), ((ByteBuffer) o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining());
-      } else if (o instanceof Long) {
-        return encoder.encodeLong((Long) o);
-      } else if (o instanceof Integer) {
-        return encoder.encodeInt((Integer) o);
-      } else if (o instanceof Short) {
-        return encoder.encodeShort((Short) o);
-      } else if (o instanceof Byte) {
-        return encoder.encodeByte((Byte) o);
-      } else if (o instanceof Boolean) {
-        return encoder.encodeBoolean((Boolean) o);
-      } else if (o instanceof Float) {
-        return encoder.encodeFloat((Float) o);
-      } else if (o instanceof Double) {
-        return encoder.encodeDouble((Double) o);
-      } else if (o instanceof Enum) {
-        return encoder.encodeInt(((Enum<?>) o).ordinal());
-      }
-    } catch (IOException ioe) {
-      throw new RuntimeException(ioe);
-    }
-
-    throw new IllegalArgumentException(UNKOWN + o.getClass().getName());
-  }
-
-  private BatchWriter getBatchWriter() throws IOException {
-    if (batchWriter == null)
-      try {
-        BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
-        batchWriterConfig.setMaxMemory(10000000);
-        batchWriterConfig.setMaxLatency(60000L, TimeUnit.MILLISECONDS);
-        batchWriterConfig.setMaxWriteThreads(4);
-        batchWriter = conn.createBatchWriter(mapping.tableName, batchWriterConfig);
-      } catch (TableNotFoundException e) {
-        throw new IOException(e);
-      }
-    return batchWriter;
-  }
-
-  /**
-   * Initialize the data store by reading the credentials, setting the client's properties up and
-   * reading the mapping file. Initialize is called when then the call to
-   * {@link org.apache.gora.store.DataStoreFactory#createDataStore} is made.
-   *
-   * @param keyClass
-   * @param persistentClass
-   * @param properties
-   */
-  @Override
-  public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) {
-    try{
-      super.initialize(keyClass, persistentClass, properties);
-
-      String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null);
-      String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
-      String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null);
-      String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null);
-
-      mapping = readMapping(mappingFile);
-
-      if (mapping.encoder == null || "".equals(mapping.encoder)) {
-        encoder = new BinaryEncoder();
-      } else {
-        try {
-          encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
-        } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
-          throw new IOException(e);
-        }
-      }
-
-      try {
-        AuthenticationToken token = new PasswordToken(password);
-        if (mock == null || !mock.equals("true")) {
-          String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null);
-          String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null);
-          conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, token);
-        } else {
-          conn = new MockInstance().getConnector(user, token);
-        }
-        credentials = new Credentials(user, token);
-
-        if (autoCreateSchema && !schemaExists())
-          createSchema();
-      } catch (AccumuloException | AccumuloSecurityException e) {
-        throw new IOException(e);
-      }
-    } catch(IOException e){
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  protected AccumuloMapping readMapping(String filename) throws IOException {
-    try {
-
-      AccumuloMapping mapping = new AccumuloMapping();
-
-      DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder();
-      Document dom = db.parse(getClass().getClassLoader().getResourceAsStream(filename));
-
-      Element root = dom.getDocumentElement();
-
-      NodeList nl = root.getElementsByTagName("class");
-      for (int i = 0; i < nl.getLength(); i++) {
-
-        Element classElement = (Element) nl.item(i);
-        if (classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName())
-            && classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) {
-
-          mapping.tableName = getSchemaName(classElement.getAttribute("table"), persistentClass);
-          mapping.encoder = classElement.getAttribute("encoder");
-
-          NodeList fields = classElement.getElementsByTagName("field");
-          for (int j = 0; j < fields.getLength(); j++) {
-            Element fieldElement = (Element) fields.item(j);
-
-            String name = fieldElement.getAttribute("name");
-            String family = fieldElement.getAttribute("family");
-            String qualifier = fieldElement.getAttribute("qualifier");
-            if ("".equals(qualifier))
-              qualifier = null;
-
-            Pair<Text,Text> col = new Pair<>(new Text(family), qualifier == null ? null : new Text(qualifier));
-            mapping.fieldMap.put(name, col);
-            mapping.columnMap.put(col, name);
-          }
-        }
-
-      }
-
-      if (mapping.tableName == null) {
-        throw new GoraException("Please define the accumulo 'table' name mapping in " + filename + " for " + persistentClass.getCanonicalName());
-      }
-
-      nl = root.getElementsByTagName("table");
-      for (int i = 0; i < nl.getLength(); i++) {
-        Element tableElement = (Element) nl.item(i);
-        if (tableElement.getAttribute("name").equals(mapping.tableName)) {
-          NodeList configs = tableElement.getElementsByTagName("config");
-          for (int j = 0; j < configs.getLength(); j++) {
-            Element configElement = (Element) configs.item(j);
-            String key = configElement.getAttribute("key");
-            String val = configElement.getAttribute("value");
-            mapping.tableConfig.put(key, val);
-          }
-        }
-      }
-
-      return mapping;
-    } catch (Exception ex) {
-      throw new IOException("Unable to read " + filename, ex);
-    }
-
-  }
-
-  @Override
-  public String getSchemaName() {
-    return mapping.tableName;
-  }
-
-  @Override
-  public void createSchema() {
-    try {
-      conn.tableOperations().create(mapping.tableName);
-      Set<Entry<String,String>> es = mapping.tableConfig.entrySet();
-      for (Entry<String,String> entry : es) {
-        conn.tableOperations().setProperty(mapping.tableName, entry.getKey(), entry.getValue());
-      }
-
-    } catch (AccumuloException | AccumuloSecurityException e) {
-      LOG.error(e.getMessage(), e);
-    } catch (TableExistsException e) {
-      LOG.debug(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public void deleteSchema() {
-    try {
-      if (batchWriter != null)
-        batchWriter.close();
-      batchWriter = null;
-      conn.tableOperations().delete(mapping.tableName);
-    } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public boolean schemaExists() {
-    return conn.tableOperations().exists(mapping.tableName);
-  }
-
-  public ByteSequence populate(Iterator<Entry<Key,Value>> iter, T persistent) throws IOException {
-    ByteSequence row = null;
-
-    Map<Utf8, Object> currentMap = null;
-    List currentArray = null;
-    Text currentFam = null;
-    int currentPos = 0;
-    Schema currentSchema = null;
-    Field currentField = null;
-
-    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new byte[0], null);
-
-    while (iter.hasNext()) {
-      Entry<Key,Value> entry = iter.next();
-
-      if (row == null) {
-        row = entry.getKey().getRowData();
-      }
-      byte[] val = entry.getValue().get();
-
-      Field field = fieldMap.get(getFieldName(entry));
-
-      if (currentMap != null) {
-        if (currentFam.equals(entry.getKey().getColumnFamily())) {
-          currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()),
-              fromBytes(currentSchema, entry.getValue().get()));
-          continue;
-        } else {
-          persistent.put(currentPos, currentMap);
-          currentMap = null;
-        }
-      } else if (currentArray != null) {
-        if (currentFam.equals(entry.getKey().getColumnFamily())) {
-          currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
-          continue;
-        } else {
-          persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray));
-          currentArray = null;
-        }
-      }
-
-      switch (field.schema().getType()) {
-      case MAP:  // first entry only. Next are handled above on the next loop
-        currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>());
-        currentPos = field.pos();
-        currentFam = entry.getKey().getColumnFamily();
-        currentSchema = field.schema().getValueType();
-
-        currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()),
-            fromBytes(currentSchema, entry.getValue().get()));
-        break;
-      case ARRAY:
-        currentArray = new DirtyListWrapper<>(new ArrayList<>());
-        currentPos = field.pos();
-        currentFam = entry.getKey().getColumnFamily();
-        currentSchema = field.schema().getElementType();
-        currentField = field;
-
-        currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
-
-        break;
-      case UNION:// default value of null acts like union with null
-        Schema effectiveSchema = field.schema().getTypes()
-        .get(firstNotNullSchemaTypeIndex(field.schema()));
-        // map and array were coded without union index so need to be read the same way
-        if (effectiveSchema.getType() == Type.ARRAY) {
-          currentArray = new DirtyListWrapper<>(new ArrayList<>());
-          currentPos = field.pos();
-          currentFam = entry.getKey().getColumnFamily();
-          currentSchema = field.schema().getElementType();
-          currentField = field;
-
-          currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
-          break;
-        }
-        else if (effectiveSchema.getType() == Type.MAP) {
-          currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>());
-          currentPos = field.pos();
-          currentFam = entry.getKey().getColumnFamily();
-          currentSchema = effectiveSchema.getValueType();
-
-          currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()),
-              fromBytes(currentSchema, entry.getValue().get()));
-          break;
-        }
-        // continue like a regular top-level union
-      case RECORD:
-        SpecificDatumReader<?> reader = new SpecificDatumReader<Schema>(field.schema());
-        persistent.put(field.pos(), reader.read(null, DecoderFactory.get().binaryDecoder(val, decoder)));
-        break;
-      default:
-        persistent.put(field.pos(), fromBytes(field.schema(), entry.getValue().get()));
-      }
-    }
-
-    if (currentMap != null) {
-      persistent.put(currentPos, currentMap);
-    } else if (currentArray != null) {
-      persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray));
-    }
-
-    persistent.clearDirty();
-
-    return row;
-  }
-
-  /**
-   * Retrieve field name from entry.
-   * @param entry The Key-Value entry
-   * @return String The field name
-   */
-  private String getFieldName(Entry<Key, Value> entry) {
-    String fieldName = mapping.columnMap.get(new Pair<>(entry.getKey().getColumnFamily(),
-        entry.getKey().getColumnQualifier()));
-    if (fieldName == null) {
-      fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(), null));
-    }
-    return fieldName;
-  }
-
-  private void setFetchColumns(Scanner scanner, String[] fields) {
-    fields = getFieldsToQuery(fields);
-    for (String field : fields) {
-      Pair<Text,Text> col = mapping.fieldMap.get(field);
-      if (col != null) {
-        if (col.getSecond() == null) {
-          scanner.fetchColumnFamily(col.getFirst());
-        } else {
-          scanner.fetchColumn(col.getFirst(), col.getSecond());
-        }
-      } else {
-        LOG.error("Mapping not found for field: {}", field);
-      }
-    }
-  }
-
-  @Override
-  public T get(K key, String[] fields) {
-    try {
-      // TODO make isolated scanner optional?
-      Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
-      Range rowRange = new Range(new Text(toBytes(key)));
-
-      scanner.setRange(rowRange);
-      setFetchColumns(scanner, fields);
-
-      T persistent = newPersistent();
-      ByteSequence row = populate(scanner.iterator(), persistent);
-      if (row == null)
-        return null;
-      return persistent;
-    } catch (TableNotFoundException e) {
-      LOG.error(e.getMessage(), e);
-      return null;
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
-      return null;
-    }
-  }
-
-  @Override
-  public void put(K key, T val) {
-
-    try{
-      Mutation m = new Mutation(new Text(toBytes(key)));
-
-      Schema schema = val.getSchema();
-      List<Field> fields = schema.getFields();
-      int count = 0;
-
-      for (int i = 0; i < fields.size(); i++) {
-        if (!val.isDirty(i)) {
-          continue;
-        }
-        Field field = fields.get(i);
-
-        Object o = val.get(field.pos());
-
-        Pair<Text,Text> col = mapping.fieldMap.get(field.name());
-
-        if (col == null) {
-          throw new GoraException("Please define the gora to accumulo mapping for field " + field.name());
-        }
-
-        switch (field.schema().getType()) {
-        case MAP:
-          count = putMap(m, count, field.schema().getValueType(), o, col, field.name());
-          break;
-        case ARRAY:
-          count = putArray(m, count, o, col, field.name());
-          break;
-        case UNION: // default value of null acts like union with null
-          Schema effectiveSchema = field.schema().getTypes()
-          .get(firstNotNullSchemaTypeIndex(field.schema()));
-          // map and array need to compute qualifier
-          if (effectiveSchema.getType() == Type.ARRAY) {
-            count = putArray(m, count, o, col, field.name());
-            break;
-          }
-          else if (effectiveSchema.getType() == Type.MAP) {
-            count = putMap(m, count, effectiveSchema.getValueType(), o, col, field.name());
-            break;
-          }
-          // continue like a regular top-level union
-        case RECORD:
-          final SpecificDatumWriter<Object> writer = new SpecificDatumWriter<>(field.schema());
-          final byte[] byteData = IOUtils.serialize(writer,o);
-          m.put(col.getFirst(), col.getSecond(), new Value(byteData));
-          count++;
-          break;
-        default:
-          m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
-          count++;
-        }
-
-      }
-
-      if (count > 0)
-        try {
-          getBatchWriter().addMutation(m);
-        } catch (MutationsRejectedException e) {
-          LOG.error(e.getMessage(), e);
-        }
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  private int putMap(Mutation m, int count, Schema valueType, Object o, Pair<Text, Text> col, String fieldName) throws GoraException {
-
-    // First of all we delete map field on accumulo store
-    Text rowKey = new Text(m.getRow());
-    Query<K, T> query = newQuery();
-    query.setFields(fieldName);
-    query.setStartKey((K)rowKey.toString());
-    query.setEndKey((K)rowKey.toString());
-    deleteByQuery(query);
-    flush();
-    if (o == null){
-      return 0;
-    }
-
-    Set<?> es = ((Map<?, ?>)o).entrySet();
-    for (Object entry : es) {
-      Object mapKey = ((Entry<?, ?>) entry).getKey();
-      Object mapVal = ((Entry<?, ?>) entry).getValue();
-      if ((o instanceof DirtyMapWrapper && ((DirtyMapWrapper<?, ?>)o).isDirty())
-          || !(o instanceof DirtyMapWrapper)) {
-        m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(valueType, mapVal)));
-        count++;
-      }
-      // TODO map value deletion
-    }
-    return count;
-  }
-
-  private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col, String fieldName) {
-
-    // First of all we delete array field on accumulo store
-    Text rowKey = new Text(m.getRow());
-    Query<K, T> query = newQuery();
-    query.setFields(fieldName);
-    query.setStartKey((K)rowKey.toString());
-    query.setEndKey((K)rowKey.toString());
-    deleteByQuery(query);
-    flush();
-    if (o == null){
-      return 0;
-    }
-
-    List<?> array = (List<?>) o;  // both GenericArray and DirtyListWrapper
-    int j = 0;
-    for (Object item : array) {
-      m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
-      count++;
-    }
-    return count;
-  }
-
-  @Override
-  public boolean delete(K key) {
-    Query<K,T> q = newQuery();
-    q.setKey(key);
-    return deleteByQuery(q) > 0;
-  }
-
-  @Override
-  public long deleteByQuery(Query<K,T> query) {
-    try {
-      Scanner scanner = createScanner(query);
-      // add iterator that drops values on the server side
-      scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, SortedKeyIterator.class));
-      RowIterator iterator = new RowIterator(scanner.iterator());
-
-      long count = 0;
-
-      while (iterator.hasNext()) {
-        Iterator<Entry<Key,Value>> row = iterator.next();
-        Mutation m = null;
-        while (row.hasNext()) {
-          Entry<Key,Value> entry = row.next();
-          Key key = entry.getKey();
-          if (m == null)
-            m = new Mutation(key.getRow());
-          // TODO optimize to avoid continually creating column vis? prob does not matter for empty
-          m.putDelete(key.getColumnFamily(), key.getColumnQualifier(), new ColumnVisibility(key.getColumnVisibility()), key.getTimestamp());
-        }
-        getBatchWriter().addMutation(m);
-        count++;
-      }
-
-      return count;
-    } catch (TableNotFoundException e) {
-      // TODO return 0?
-      LOG.error(e.getMessage(), e);
-      return 0;
-    } catch (MutationsRejectedException e) {
-      LOG.error(e.getMessage(), e);
-      return 0;
-    } catch (IOException e){
-      LOG.error(e.getMessage(), e);
-      return 0;
-    }
-  }
-
-  private Range createRange(Query<K,T> query) {
-    Text startRow = null;
-    Text endRow = null;
-
-    if (query.getStartKey() != null)
-      startRow = new Text(toBytes(query.getStartKey()));
-
-    if (query.getEndKey() != null)
-      endRow = new Text(toBytes(query.getEndKey()));
-
-    return new Range(startRow, true, endRow, true);
-
-  }
-
-  private Scanner createScanner(Query<K,T> query) throws TableNotFoundException {
-    // TODO make isolated scanner optional?
-    Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
-    setFetchColumns(scanner, query.getFields());
-
-    scanner.setRange(createRange(query));
-
-    if (query.getStartTime() != -1 || query.getEndTime() != -1) {
-      IteratorSetting is = new IteratorSetting(30, TimestampFilter.class);
-      if (query.getStartTime() != -1)
-        TimestampFilter.setStart(is, query.getStartTime(), true);
-      if (query.getEndTime() != -1)
-        TimestampFilter.setEnd(is, query.getEndTime(), true);
-
-      scanner.addScanIterator(is);
-    }
-
-    return scanner;
-  }
-
-  /**
-   * Execute the query and return the result.
-   */
-  @Override
-  public Result<K,T> execute(Query<K,T> query) {
-    try {
-      Scanner scanner = createScanner(query);
-      return new AccumuloResult<>(this, query, scanner);
-    } catch (TableNotFoundException e) {
-      // TODO return empty result?
-      LOG.error(e.getMessage(), e);
-      return null;
-    }
-  }
-
-  @Override
-  public Query<K,T> newQuery() {
-    return new AccumuloQuery<>(this);
-  }
-
-  Text pad(Text key, int bytes) {
-    if (key.getLength() < bytes)
-      key = new Text(key);
-
-    while (key.getLength() < bytes) {
-      key.append(new byte[] {0}, 0, 1);
-    }
-
-    return key;
-  }
-
-  @Override
-  public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws IOException {
-    try {
-      TabletLocator tl;
-      if (conn instanceof MockConnector)
-        tl = new MockTabletLocator();
-      else
-        tl = TabletLocator.getLocator(conn.getInstance(), new Text(Tables.getTableId(conn.getInstance(), mapping.tableName)));
-
-      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
-
-      tl.invalidateCache();
-      while (tl.binRanges(credentials, Collections.singletonList(createRange(query)), binnedRanges).size() > 0) {
-        // TODO log?
-        if (!Tables.exists(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)))
-          throw new TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName));
-        else if (Tables.getTableState(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)) == TableState.OFFLINE)
-          throw new TableOfflineException(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName));
-        UtilWaitThread.sleep(100);
-        tl.invalidateCache();
-      }
-
-      List<PartitionQuery<K,T>> ret = new ArrayList<>();
-
-      Text startRow = null;
-      Text endRow = null;
-      if (query.getStartKey() != null)
-        startRow = new Text(toBytes(query.getStartKey()));
-      if (query.getEndKey() != null)
-        endRow = new Text(toBytes(query.getEndKey()));
-
-      //hadoop expects hostnames, accumulo keeps track of IPs... so need to convert
-      HashMap<String,String> hostNameCache = new HashMap<>();
-
-      for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
-        String ip = entry.getKey().split(":", 2)[0];
-        String location = hostNameCache.get(ip);
-        if (location == null) {
-          InetAddress inetAddress = InetAddress.getByName(ip);
-          location = inetAddress.getHostName();
-          hostNameCache.put(ip, location);
-        }
-
-        Map<KeyExtent,List<Range>> tablets = entry.getValue();
-        for (KeyExtent ke : tablets.keySet()) {
-
-          K startKey = null;
-          if (startRow == null || !ke.contains(startRow)) {
-            if (ke.getPrevEndRow() != null) {
-              startKey = followingKey(encoder, getKeyClass(), getBytes(ke.getPrevEndRow()));
-            }
-          } else {
-            startKey = fromBytes(getKeyClass(), getBytes(startRow));
-          }
-
-          K endKey = null;
-          if (endRow == null || !ke.contains(endRow)) {
-            if (ke.getEndRow() != null)
-              endKey = lastPossibleKey(encoder, getKeyClass(), getBytes(ke.getEndRow()));
-          } else {
-            endKey = fromBytes(getKeyClass(), getBytes(endRow));
-          }
-
-          PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<>(query, startKey, endKey, location);
-          pqi.setConf(getConf());
-          ret.add(pqi);
-        }
-      }
-
-      return ret;
-    } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
-      throw new IOException(e);
-    }
-
-  }
-
-  static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] er) {
-
-    if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
-      throw new UnsupportedOperationException();
-    } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
-      throw new UnsupportedOperationException();
-    } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
-      return fromBytes(encoder, clazz, encoder.lastPossibleKey(2, er));
-    } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
-      return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er));
-    } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
-      return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er));
-    } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
-      return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er));
-    } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
-      return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er));
-    } else if (clazz.equals(String.class)) {
-      throw new UnsupportedOperationException();
-    } else if (clazz.equals(Utf8.class)) {
-      return fromBytes(encoder, clazz, er);
-    }
-
-    throw new IllegalArgumentException(UNKOWN + clazz.getName());
-  }
-
-  @SuppressWarnings("unchecked")
-  static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) {
-
-    if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
-      return (K) Byte.valueOf(encoder.followingKey(1, per)[0]);
-    } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
-      throw new UnsupportedOperationException();
-    } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
-      return fromBytes(encoder, clazz, encoder.followingKey(2, per));
-    } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
-      return fromBytes(encoder, clazz, encoder.followingKey(4, per));
-    } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
-      return fromBytes(encoder, clazz, encoder.followingKey(8, per));
-    } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
-      return fromBytes(encoder, clazz, encoder.followingKey(4, per));
-    } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
-      return fromBytes(encoder, clazz, encoder.followingKey(8, per));
-    } else if (clazz.equals(String.class)) {
-      throw new UnsupportedOperationException();
-    } else if (clazz.equals(Utf8.class)) {
-      return fromBytes(encoder, clazz, Arrays.copyOf(per, per.length + 1));
-    }
-
-    throw new IllegalArgumentException(UNKOWN + clazz.getName());
-  }
-
-  @Override
-  public void flush() {
-    try {
-      if (batchWriter != null) {
-        batchWriter.flush();
-      }
-    } catch (MutationsRejectedException e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public void close() {
-    try {
-      if (batchWriter != null) {
-        batchWriter.close();
-        batchWriter = null;
-      }
-    } catch (MutationsRejectedException e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
+public class AccumuloStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
+
+	protected static final String MOCK_PROPERTY = "accumulo.mock";
+	protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance";
+	protected static final String ZOOKEEPERS_NAME_PROPERTY = "accumulo.zookeepers";
+	protected static final String USERNAME_PROPERTY = "accumulo.user";
+	protected static final String PASSWORD_PROPERTY = "accumulo.password";
+	protected static final String DEFAULT_MAPPING_FILE = "gora-accumulo-mapping.xml";
+
+	private final static String UNKOWN = "Unknown type ";
+
+	private Connector conn;
+	private BatchWriter batchWriter;
+	private AccumuloMapping mapping;
+	private Credentials credentials;
+	private Encoder encoder;
+
+	public static final Logger LOG = LoggerFactory.getLogger(AccumuloStore.class);
+
+	public Object fromBytes(Schema schema, byte[] data) throws IOException {
+		Schema fromSchema = null;
+		if (schema.getType() == Type.UNION) {
+			try {
+				Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
+				int unionIndex = decoder.readIndex();
+				List<Schema> possibleTypes = schema.getTypes();
+				fromSchema = possibleTypes.get(unionIndex);
+				Schema effectiveSchema = possibleTypes.get(unionIndex);
+				if (effectiveSchema.getType() == Type.NULL) {
+					decoder.readNull();
+					return null;
+				} else {
+					data = decoder.readBytes(null).array();
+				}
+			} catch (IOException e) {
+				LOG.error(e.getMessage());
+				throw new GoraException("Error decoding union type: ", e);
+			}
+		} else {
+			fromSchema = schema;
+		}
+		return fromBytes(encoder, fromSchema, data);
+	}
+
+	public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) throws IOException {
+		switch (schema.getType()) {
+		case BOOLEAN:
+			return encoder.decodeBoolean(data);
+		case DOUBLE:
+			return encoder.decodeDouble(data);
+		case FLOAT:
+			return encoder.decodeFloat(data);
+		case INT:
+			return encoder.decodeInt(data);
+		case LONG:
+			return encoder.decodeLong(data);
+		case STRING:
+			return new Utf8(data);
+		case BYTES:
+			return ByteBuffer.wrap(data);
+		case ENUM:
+			return AvroUtils.getEnumValue(schema, encoder.decodeInt(data));
+		case ARRAY:
+			break;
+		case FIXED:
+			break;
+		case MAP:
+			break;
+		case NULL:
+			break;
+		case RECORD:
+			break;
+		case UNION:
+			break;
+		default:
+			break;
+		}
+		throw new IllegalArgumentException(UNKOWN + schema.getType());
+
+	}
+
+	private static byte[] getBytes(Text text) {
+		byte[] bytes = text.getBytes();
+		if (bytes.length != text.getLength()) {
+			bytes = new byte[text.getLength()];
+			System.arraycopy(text.getBytes(), 0, bytes, 0, bytes.length);
+		}
+		return bytes;
+	}
+
+	public K fromBytes(Class<K> clazz, byte[] val) {
+		return fromBytes(encoder, clazz, val);
+	}
+
+	@SuppressWarnings("unchecked")
+	public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] val) {
+		try {
+			if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
+				return (K) Byte.valueOf(encoder.decodeByte(val));
+			} else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
+				return (K) Boolean.valueOf(encoder.decodeBoolean(val));
+			} else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
+				return (K) Short.valueOf(encoder.decodeShort(val));
+			} else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
+				return (K) Integer.valueOf(encoder.decodeInt(val));
+			} else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
+				return (K) Long.valueOf(encoder.decodeLong(val));
+			} else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
+				return (K) Float.valueOf(encoder.decodeFloat(val));
+			} else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
+				return (K) Double.valueOf(encoder.decodeDouble(val));
+			} else if (clazz.equals(String.class)) {
+				return (K) new String(val, "UTF-8");
+			} else if (clazz.equals(Utf8.class)) {
+				return (K) new Utf8(val);
+			}
+
+			throw new IllegalArgumentException(UNKOWN + clazz.getName());
+		} catch (IOException ioe) {
+			LOG.error(ioe.getMessage());
+			throw new RuntimeException(ioe);
+		}
+	}
+
+	private static byte[] copyIfNeeded(byte b[], int offset, int len) {
+		if (len != b.length || offset != 0) {
+			byte[] copy = new byte[len];
+			System.arraycopy(b, offset, copy, 0, copy.length);
+			b = copy;
+		}
+		return b;
+	}
+
+	public byte[] toBytes(Schema toSchema, Object o) {
+		if (toSchema != null && toSchema.getType() == Type.UNION) {
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			org.apache.avro.io.BinaryEncoder avroEncoder = EncoderFactory.get().binaryEncoder(baos, null);
+			int unionIndex = 0;
+			try {
+				if (o == null) {
+					unionIndex = firstNullSchemaTypeIndex(toSchema);
+					avroEncoder.writeIndex(unionIndex);
+					avroEncoder.writeNull();
+				} else {
+					unionIndex = firstNotNullSchemaTypeIndex(toSchema);
+					avroEncoder.writeIndex(unionIndex);
+					avroEncoder.writeBytes(toBytes(o));
+				}
+				avroEncoder.flush();
+				return baos.toByteArray();
+			} catch (IOException e) {
+				LOG.error(e.getMessage());
+				return toBytes(o);
+			}
+		} else {
+			return toBytes(o);
+		}
+	}
+
+	private int firstNullSchemaTypeIndex(Schema toSchema) {
+		List<Schema> possibleTypes = toSchema.getTypes();
+		int unionIndex = 0;
+		for (int i = 0; i < possibleTypes.size(); i++) {
+			Type pType = possibleTypes.get(i).getType();
+			if (pType == Type.NULL) { // FIXME HUGE kludge to pass tests
+				unionIndex = i;
+				break;
+			}
+		}
+		return unionIndex;
+	}
+
+	private int firstNotNullSchemaTypeIndex(Schema toSchema) {
+		List<Schema> possibleTypes = toSchema.getTypes();
+		int unionIndex = 0;
+		for (int i = 0; i < possibleTypes.size(); i++) {
+			Type pType = possibleTypes.get(i).getType();
+			if (pType != Type.NULL) { // FIXME HUGE kludge to pass tests
+				unionIndex = i;
+				break;
+			}
+		}
+		return unionIndex;
+	}
+
+	public byte[] toBytes(Object o) {
+		return toBytes(encoder, o);
+	}
+
+	public static byte[] toBytes(Encoder encoder, Object o) {
+
+		try {
+			if (o instanceof String) {
+				return ((String) o).getBytes("UTF-8");
+			} else if (o instanceof Utf8) {
+				return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) o).getByteLength());
+			} else if (o instanceof ByteBuffer) {
+				return copyIfNeeded(((ByteBuffer) o).array(),
+						((ByteBuffer) o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining());
+			} else if (o instanceof Long) {
+				return encoder.encodeLong((Long) o);
+			} else if (o instanceof Integer) {
+				return encoder.encodeInt((Integer) o);
+			} else if (o instanceof Short) {
+				return encoder.encodeShort((Short) o);
+			} else if (o instanceof Byte) {
+				return encoder.encodeByte((Byte) o);
+			} else if (o instanceof Boolean) {
+				return encoder.encodeBoolean((Boolean) o);
+			} else if (o instanceof Float) {
+				return encoder.encodeFloat((Float) o);
+			} else if (o instanceof Double) {
+				return encoder.encodeDouble((Double) o);
+			} else if (o instanceof Enum) {
+				return encoder.encodeInt(((Enum<?>) o).ordinal());
+			}
+		} catch (IOException ioe) {
+			throw new RuntimeException(ioe);
+		}
+
+		throw new IllegalArgumentException(UNKOWN + o.getClass().getName());
+	}
+
+	private BatchWriter getBatchWriter() throws IOException {
+		if (batchWriter == null)
+			try {
+				BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
+				batchWriterConfig.setMaxMemory(10000000);
+				batchWriterConfig.setMaxLatency(60000L, TimeUnit.MILLISECONDS);
+				batchWriterConfig.setMaxWriteThreads(4);
+				batchWriter = conn.createBatchWriter(mapping.tableName, batchWriterConfig);
+			} catch (TableNotFoundException e) {
+				throw new IOException(e);
+			}
+		return batchWriter;
+	}
+
+	/**
+	 * Initialize the data store by reading the credentials, setting the
+	 * client's properties up and reading the mapping file. Initialize is called
+	 * when then the call to
+	 * {@link org.apache.gora.store.DataStoreFactory#createDataStore} is made.
+	 *
+	 * @param keyClass
+	 * @param persistentClass
+	 * @param properties
+	 */
+	@Override
+	public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) {
+		try {
+			super.initialize(keyClass, persistentClass, properties);
+
+			String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null);
+			String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
+			String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null);
+			String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null);
+
+			mapping = readMapping(mappingFile);
+
+			if (mapping.encoder == null || "".equals(mapping.encoder)) {
+				encoder = new BinaryEncoder();
+			} else {
+				try {
+					encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
+				} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+					throw new IOException(e);
+				}
+			}
+
+			try {
+				AuthenticationToken token = new PasswordToken(password);
+				if (mock == null || !mock.equals("true")) {
+					String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null);
+					String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null);
+					conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, token);
+				} else {
+					conn = new MockInstance().getConnector(user, token);
+				}
+				credentials = new Credentials(user, token);
+
+				if (autoCreateSchema && !schemaExists())
+					createSchema();
+			} catch (AccumuloException | AccumuloSecurityException e) {
+				throw new IOException(e);
+			}
+		} catch (IOException e) {
+			LOG.error(e.getMessage(), e);
+		}
+	}
+
+	protected AccumuloMapping readMapping(String filename) throws IOException {
+		try {
+
+			AccumuloMapping mapping = new AccumuloMapping();
+
+			DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder();
+			Document dom = db.parse(getClass().getClassLoader().getResourceAsStream(filename));
+
+			Element root = dom.getDocumentElement();
+
+			NodeList nl = root.getElementsByTagName("class");
+			for (int i = 0; i < nl.getLength(); i++) {
+
+				Element classElement = (Element) nl.item(i);
+				if (classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName())
+						&& classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) {
+
+					mapping.tableName = getSchemaName(classElement.getAttribute("table"), persistentClass);
+					mapping.encoder = classElement.getAttribute("encoder");
+
+					NodeList fields = classElement.getElementsByTagName("field");
+					for (int j = 0; j < fields.getLength(); j++) {
+						Element fieldElement = (Element) fields.item(j);
+
+						String name = fieldElement.getAttribute("name");
+						String family = fieldElement.getAttribute("family");
+						String qualifier = fieldElement.getAttribute("qualifier");
+						if ("".equals(qualifier))
+							qualifier = null;
+
+						Pair<Text, Text> col = new Pair<>(new Text(family),
+								qualifier == null ? null : new Text(qualifier));
+						mapping.fieldMap.put(name, col);
+						mapping.columnMap.put(col, name);
+					}
+				}
+
+			}
+
+			if (mapping.tableName == null) {
+				throw new GoraException("Please define the accumulo 'table' name mapping in " + filename + " for "
+						+ persistentClass.getCanonicalName());
+			}
+
+			nl = root.getElementsByTagName("table");
+			for (int i = 0; i < nl.getLength(); i++) {
+				Element tableElement = (Element) nl.item(i);
+				if (tableElement.getAttribute("name").equals(mapping.tableName)) {
+					NodeList configs = tableElement.getElementsByTagName("config");
+					for (int j = 0; j < configs.getLength(); j++) {
+						Element configElement = (Element) configs.item(j);
+						String key = configElement.getAttribute("key");
+						String val = configElement.getAttribute("value");
+						mapping.tableConfig.put(key, val);
+					}
+				}
+			}
+
+			return mapping;
+		} catch (Exception ex) {
+			throw new IOException("Unable to read " + filename, ex);
+		}
+
+	}
+
+	@Override
+	public String getSchemaName() {
+		return mapping.tableName;
+	}
+
+	@Override
+	public void createSchema() {
+		try {
+			conn.tableOperations().create(mapping.tableName);
+			Set<Entry<String, String>> es = mapping.tableConfig.entrySet();
+			for (Entry<String, String> entry : es) {
+				conn.tableOperations().setProperty(mapping.tableName, entry.getKey(), entry.getValue());
+			}
+
+		} catch (AccumuloException | AccumuloSecurityException e) {
+			LOG.error(e.getMessage(), e);
+		} catch (TableExistsException e) {
+			LOG.debug(e.getMessage(), e);
+		}
+	}
+
+	@Override
+	public void deleteSchema() {
+		try {
+			if (batchWriter != null)
+				batchWriter.close();
+			batchWriter = null;
+			conn.tableOperations().delete(mapping.tableName);
+		} catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+			LOG.error(e.getMessage(), e);
+		}
+	}
+
+	@Override
+	public boolean schemaExists() {
+		return conn.tableOperations().exists(mapping.tableName);
+	}
+
+	public ByteSequence populate(Iterator<Entry<Key, Value>> iter, T persistent) throws IOException {
+		ByteSequence row = null;
+
+		Map<Utf8, Object> currentMap = null;
+		List currentArray = null;
+		Text currentFam = null;
+		int currentPos = 0;
+		Schema currentSchema = null;
+		Field currentField = null;
+
+		BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new byte[0], null);
+
+		while (iter.hasNext()) {
+			Entry<Key, Value> entry = iter.next();
+
+			if (row == null) {
+				row = entry.getKey().getRowData();
+			}
+			byte[] val = entry.getValue().get();
+
+			Field field = fieldMap.get(getFieldName(entry));
+
+			if (currentMap != null) {
+				if (currentFam.equals(entry.getKey().getColumnFamily())) {
+					currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()),
+							fromBytes(currentSchema, entry.getValue().get()));
+					continue;
+				} else {
+					persistent.put(currentPos, currentMap);
+					currentMap = null;
+				}
+			} else if (currentArray != null) {
+				if (currentFam.equals(entry.getKey().getColumnFamily())) {
+					currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
+					continue;
+				} else {
+					persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray));
+					currentArray = null;
+				}
+			}
+
+			switch (field.schema().getType()) {
+			case MAP: // first entry only. Next are handled above on the next
+						// loop
+				currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>());
+				currentPos = field.pos();
+				currentFam = entry.getKey().getColumnFamily();
+				currentSchema = field.schema().getValueType();
+
+				currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()),
+						fromBytes(currentSchema, entry.getValue().get()));
+				break;
+			case ARRAY:
+				currentArray = new DirtyListWrapper<>(new ArrayList<>());
+				currentPos = field.pos();
+				currentFam = entry.getKey().getColumnFamily();
+				currentSchema = field.schema().getElementType();
+				currentField = field;
+
+				currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
+
+				break;
+			case UNION:// default value of null acts like union with null
+				Schema effectiveSchema = field.schema().getTypes().get(firstNotNullSchemaTypeIndex(field.schema()));
+				// map and array were coded without union index so need to be
+				// read the same way
+				if (effectiveSchema.getType() == Type.ARRAY) {
+					currentArray = new DirtyListWrapper<>(new ArrayList<>());
+					currentPos = field.pos();
+					currentFam = entry.getKey().getColumnFamily();
+					currentSchema = field.schema().getElementType();
+					currentField = field;
+
+					currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
+					break;
+				} else if (effectiveSchema.getType() == Type.MAP) {
+					currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>());
+					currentPos = field.pos();
+					currentFam = entry.getKey().getColumnFamily();
+					currentSchema = effectiveSchema.getValueType();
+
+					currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()),
+							fromBytes(currentSchema, entry.getValue().get()));
+					break;
+				}
+				// continue like a regular top-level union
+			case RECORD:
+				SpecificDatumReader<?> reader = new SpecificDatumReader<Schema>(field.schema());
+				persistent.put(field.pos(), reader.read(null, DecoderFactory.get().binaryDecoder(val, decoder)));
+				break;
+			default:
+				persistent.put(field.pos(), fromBytes(field.schema(), entry.getValue().get()));
+			}
+		}
+
+		if (currentMap != null) {
+			persistent.put(currentPos, currentMap);
+		} else if (currentArray != null) {
+			persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray));
+		}
+
+		persistent.clearDirty();
+
+		return row;
+	}
+
+	/**
+	 * Retrieve field name from entry.
+	 * 
+	 * @param entry
+	 *            The Key-Value entry
+	 * @return String The field name
+	 */
+	private String getFieldName(Entry<Key, Value> entry) {
+		String fieldName = mapping.columnMap
+				.get(new Pair<>(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()));
+		if (fieldName == null) {
+			fieldName = mapping.columnMap.get(new Pair<Text, Text>(entry.getKey().getColumnFamily(), null));
+		}
+		return fieldName;
+	}
+
+	private void setFetchColumns(Scanner scanner, String[] fields) {
+		fields = getFieldsToQuery(fields);
+		for (String field : fields) {
+			Pair<Text, Text> col = mapping.fieldMap.get(field);
+			if (col != null) {
+				if (col.getSecond() == null) {
+					scanner.fetchColumnFamily(col.getFirst());
+				} else {
+					scanner.fetchColumn(col.getFirst(), col.getSecond());
+				}
+			} else {
+				LOG.error("Mapping not found for field: {}", field);
+			}
+		}
+	}
+
+	@Override
+	public T get(K key, String[] fields) {
+		try {
+			// TODO make isolated scanner optional?
+			Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
+			Range rowRange = new Range(new Text(toBytes(key)));
+
+			scanner.setRange(rowRange);
+			setFetchColumns(scanner, fields);
+
+			T persistent = newPersistent();
+			ByteSequence row = populate(scanner.iterator(), persistent);
+			if (row == null)
+				return null;
+			return persistent;
+		} catch (TableNotFoundException e) {
+			LOG.error(e.getMessage(), e);
+			return null;
+		} catch (IOException e) {
+			LOG.error(e.getMessage(), e);
+			return null;
+		}
+	}
+
+	@Override
+	public void put(K key, T val) {
+
+		try {
+			Mutation m = new Mutation(new Text(toBytes(key)));
+
+			Schema schema = val.getSchema();
+			List<Field> fields = schema.getFields();
+			int count = 0;
+
+			for (int i = 0; i < fields.size(); i++) {
+				if (!val.isDirty(i)) {
+					continue;
+				}
+				Field field = fields.get(i);
+
+				Object o = val.get(field.pos());
+
+				Pair<Text, Text> col = mapping.fieldMap.get(field.name());
+
+				if (col == null) {
+					throw new GoraException("Please define the gora to accumulo mapping for field " + field.name());
+				}
+
+				switch (field.schema().getType()) {
+				case MAP:
+					count = putMap(m, count, field.schema().getValueType(), o, col, field.name());
+					break;
+				case ARRAY:
+					count = putArray(m, count, o, col, field.name());
+					break;
+				case UNION: // default value of null acts like union with null
+					Schema effectiveSchema = field.schema().getTypes().get(firstNotNullSchemaTypeIndex(field.schema()));
+					// map and array need to compute qualifier
+					if (effectiveSchema.getType() == Type.ARRAY) {
+						count = putArray(m, count, o, col, field.name());
+						break;
+					} else if (effectiveSchema.getType() == Type.MAP) {
+						count = putMap(m, count, effectiveSchema.getValueType(), o, col, field.name());
+						break;
+					}
+					// continue like a regular top-level union
+				case RECORD:
+					final SpecificDatumWriter<Object> writer = new SpecificDatumWriter<>(field.schema());
+					final byte[] byteData = IOUtils.serialize(writer, o);
+					m.put(col.getFirst(), col.getSecond(), new Value(byteData));
+					count++;
+					break;
+				default:
+					m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
+					count++;
+				}
+
+			}
+
+			if (count > 0)
+				try {
+					getBatchWriter().addMutation(m);
+				} catch (MutationsRejectedException e) {
+					LOG.error(e.getMessage(), e);
+				}
+		} catch (IOException e) {
+			LOG.error(e.getMessage(), e);
+		}
+	}
+
+	private int putMap(Mutation m, int count, Schema valueType, Object o, Pair<Text, Text> col, String fieldName)
+			throws GoraException {
+
+		// First of all we delete map field on accumulo store
+		Text rowKey = new Text(m.getRow());
+		Query<K, T> query = newQuery();
+		query.setFields(fieldName);
+		query.setStartKey((K) rowKey.toString());
+		query.setEndKey((K) rowKey.toString());
+		deleteByQuery(query);
+		flush();
+		if (o == null) {
+			return 0;
+		}
+
+		Set<?> es = ((Map<?, ?>) o).entrySet();
+		for (Object entry : es) {
+			Object mapKey = ((Entry<?, ?>) entry).getKey();
+			Object mapVal = ((Entry<?, ?>) entry).getValue();
+			if ((o instanceof DirtyMapWrapper && ((DirtyMapWrapper<?, ?>) o).isDirty())
+					|| !(o instanceof DirtyMapWrapper)) {
+				m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(valueType, mapVal)));
+				count++;
+			}
+			// TODO map value deletion
+		}
+		return count;
+	}
+
+	private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col, String fieldName) {
+
+		// First of all we delete array field on accumulo store
+		Text rowKey = new Text(m.getRow());
+		Query<K, T> query = newQuery();
+		query.setFields(fieldName);
+		query.setStartKey((K) rowKey.toString());
+		query.setEndKey((K) rowKey.toString());
+		deleteByQuery(query);
+		flush();
+		if (o == null) {
+			return 0;
+		}
+
+		List<?> array = (List<?>) o; // both GenericArray and DirtyListWrapper
+		int j = 0;
+		for (Object item : array) {
+			m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
+			count++;
+		}
+		return count;
+	}
+
+	@Override
+	public boolean delete(K key) {
+		Query<K, T> q = newQuery();
+		q.setKey(key);
+		return deleteByQuery(q) > 0;
+	}
+
+	@Override
+	public long deleteByQuery(Query<K, T> query) {
+		try {
+			Scanner scanner = createScanner(query);
+			// add iterator that drops values on the server side
+			scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, SortedKeyIterator.class));
+			RowIterator iterator = new RowIterator(scanner.iterator());
+
+			long count = 0;
+
+			while (iterator.hasNext()) {
+				Iterator<Entry<Key, Value>> row = iterator.next();
+				Mutation m = null;
+				while (row.hasNext()) {
+					Entry<Key, Value> entry = row.next();
+					Key key = entry.getKey();
+					if (m == null)
+						m = new Mutation(key.getRow());
+					// TODO optimize to avoid continually creating column vis?
+					// prob does not matter for empty
+					m.putDelete(key.getColumnFamily(), key.getColumnQualifier(),
+							new ColumnVisibility(key.getColumnVisibility()), key.getTimestamp());
+				}
+				getBatchWriter().addMutation(m);
+				count++;
+			}
+
+			return count;
+		} catch (TableNotFoundException e) {
+			// TODO return 0?
+			LOG.error(e.getMessage(), e);
+			return 0;
+		} catch (MutationsRejectedException e) {
+			LOG.error(e.getMessage(), e);
+			return 0;
+		} catch (IOException e) {
+			LOG.error(e.getMessage(), e);
+			return 0;
+		}
+	}
+
+	private Range createRange(Query<K, T> query) {
+		Text startRow = null;
+		Text endRow = null;
+
+		if (query.getStartKey() != null)
+			startRow = new Text(toBytes(query.getStartKey()));
+
+		if (query.getEndKey() != null)
+			endRow = new Text(toBytes(query.getEndKey()));
+
+		return new Range(startRow, true, endRow, true);
+
+	}
+
+	private Scanner createScanner(Query<K, T> query) throws TableNotFoundException {
+		// TODO make isolated scanner optional?
+		Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
+		setFetchColumns(scanner, query.getFields());
+
+		scanner.setRange(createRange(query));
+
+		if (query.getStartTime() != -1 || query.getEndTime() != -1) {
+			IteratorSetting is = new IteratorSetting(30, TimestampFilter.class);
+			if (query.getStartTime() != -1)
+				TimestampFilter.setStart(is, query.getStartTime(), true);
+			if (query.getEndTime() != -1)
+				TimestampFilter.setEnd(is, query.getEndTime(), true);
+
+			scanner.addScanIterator(is);
+		}
+
+		return scanner;
+	}
+
+	/**
+	 * Execute the query and return the result.
+	 */
+	@Override
+	public Result<K, T> execute(Query<K, T> query) {
+		try {
+			Scanner scanner = createScanner(query);
+			return new AccumuloResult<>(this, query, scanner);
+		} catch (TableNotFoundException e) {
+			// TODO return empty result?
+			LOG.error(e.getMessage(), e);
+			return null;
+		}
+	}
+
+	@Override
+	public Query<K, T> newQuery() {
+		return new AccumuloQuery<>(this);
+	}
+
+	Text pad(Text key, int bytes) {
+		if (key.getLength() < bytes)
+			key = new Text(key);
+
+		while (key.getLength() < bytes) {
+			key.append(new byte[] { 0 }, 0, 1);
+		}
+
+		return key;
+	}
+
+	@Override
+	public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException {
+		try {
+			TabletLocator tl;
+			if (conn instanceof MockConnector)
+				tl = new MockTabletLocator();
+			else
+				tl = TabletLocator.getLocator(
+						new ClientContext(conn.getInstance(), credentials,
+								AccumuloConfiguration.getTableConfiguration(conn,
+										Tables.getTableId(conn.getInstance(), mapping.tableName))),
+						new Text(Tables.getTableId(conn.getInstance(), mapping.tableName)));
+
+			Map<String, Map<KeyExtent, List<Range>>> binnedRanges = new HashMap<>();
+
+			tl.invalidateCache();
+			while (tl.binRanges(
+					new ClientContext(conn.getInstance(), credentials,
+							AccumuloConfiguration.getTableConfiguration(conn,
+									Tables.getTableId(conn.getInstance(), mapping.tableName))),
+					Collections.singletonList(createRange(query)), binnedRanges).size() > 0) {
+				// TODO log?
+				if (!Tables.exists(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)))
+					throw new TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName));
+				else if (Tables.getTableState(conn.getInstance(),
+						Tables.getTableId(conn.getInstance(), mapping.tableName)) == TableState.OFFLINE)
+					throw new TableOfflineException(conn.getInstance(),
+							Tables.getTableId(conn.getInstance(), mapping.tableName));
+				UtilWaitThread.sleep(100);
+				tl.invalidateCache();
+			}
+
+			List<PartitionQuery<K, T>> ret = new ArrayList<>();
+
+			Text startRow = null;
+			Text endRow = null;
+			if (query.getStartKey() != null)
+				startRow = new Text(toBytes(query.getStartKey()));
+			if (query.getEndKey() != null)
+				endRow = new Text(toBytes(query.getEndKey()));
+
+			// hadoop expects hostnames, accumulo keeps track of IPs... so need
+			// to convert
+			HashMap<String, String> hostNameCache = new HashMap<>();
+
+			for (Entry<String, Map<KeyExtent, List<Range>>> entry : binnedRanges.entrySet()) {
+				String ip = entry.getKey().split(":", 2)[0];
+				String location = hostNameCache.get(ip);
+				if (location == null) {
+					InetAddress inetAddress = InetAddress.getByName(ip);
+					location = inetAddress.getHostName();
+					hostNameCache.put(ip, location);
+				}
+
+				Map<KeyExtent, List<Range>> tablets = entry.getValue();
+				for (KeyExtent ke : tablets.keySet()) {
+
+					K startKey = null;
+					if (startRow == null || !ke.contains(startRow)) {
+						if (ke.getPrevEndRow() != null) {
+							startKey = followingKey(encoder, getKeyClass(), getBytes(ke.getPrevEndRow()));
+						}
+					} else {
+						startKey = fromBytes(getKeyClass(), getBytes(startRow));
+					}
+
+					K endKey = null;
+					if (endRow == null || !ke.contains(endRow)) {
+						if (ke.getEndRow() != null)
+							endKey = lastPossibleKey(encoder, getKeyClass(), getBytes(ke.getEndRow()));
+					} else {
+						endKey = fromBytes(getKeyClass(), getBytes(endRow));
+					}
+
+					PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<>(query, startKey, endKey, location);
+					pqi.setConf(getConf());
+					ret.add(pqi);
+				}
+			}
+
+			return ret;
+		} catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
+			throw new IOException(e);
+		}
+
+	}
+
+	static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] er) {
+
+		if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
+			throw new UnsupportedOperationException();
+		} else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
+			throw new UnsupportedOperationException();
+		} else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
+			return fromBytes(encoder, clazz, encoder.lastPossibleKey(2, er));
+		} else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
+			return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er));
+		} else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
+			return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er));
+		} else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
+			return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er));
+		} else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
+			return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er));
+		} else if (clazz.equals(String.class)) {
+			throw new UnsupportedOperationException();
+		} else if (clazz.equals(Utf8.class)) {
+			return fromBytes(encoder, clazz, er);
+		}
+
+		throw new IllegalArgumentException(UNKOWN + clazz.getName());
+	}
+
+	@SuppressWarnings("unchecked")
+	static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) {
+
+		if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
+			return (K) Byte.valueOf(encoder.followingKey(1, per)[0]);
+		} else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
+			throw new UnsupportedOperationException();
+		} else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
+			return fromBytes(encoder, clazz, encoder.followingKey(2, per));
+		} else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
+			return fromBytes(encoder, clazz, encoder.followingKey(4, per));
+		} else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
+			return fromBytes(encoder, clazz, encoder.followingKey(8, per));
+		} else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
+			return fromBytes(encoder, clazz, encoder.followingKey(4, per));
+		} else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
+			return fromBytes(encoder, clazz, encoder.followingKey(8, per));
+		} else if (clazz.equals(String.class)) {
+			throw new UnsupportedOperationException();
+		} else if (clazz.equals(Utf8.class)) {
+			return fromBytes(encoder, clazz, Arrays.copyOf(per, per.length + 1));
+		}
+
+		throw new IllegalArgumentException(UNKOWN + clazz.getName());
+	}
+
+	@Override
+	public void flush() {
+		try {
+			if (batchWriter != null) {
+				batchWriter.flush();
+			}
+		} catch (MutationsRejectedException e) {
+			LOG.error(e.getMessage(), e);
+		}
+	}
+
+	@Override
+	public void close() {
+		try {
+			if (batchWriter != null) {
+				batchWriter.close();
+				batchWriter = null;
+			}
+		} catch (MutationsRejectedException e) {
+			LOG.error(e.getMessage(), e);
+		}
+	}
 }


Mime
View raw message