gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject svn commit: r1579741 - in /gora/branches/GORA_94: ./ gora-accumulo/ gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/ gora-accumulo/src/main/java/org/apache/gora/accumulo/query/ gora-accumulo/src/main/java/org/apache/gora/accumulo/store/ g...
Date Thu, 20 Mar 2014 21:13:34 GMT
Author: lewismc
Date: Thu Mar 20 21:13:33 2014
New Revision: 1579741

URL: http://svn.apache.org/r1579741
Log:
GORA-244 Upgrade to Avro 1.7.X in gora-accumulo

Modified:
    gora/branches/GORA_94/gora-accumulo/pom.xml
    gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java
    gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java
    gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
    gora/branches/GORA_94/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml
    gora/branches/GORA_94/gora-accumulo/src/test/resources/gora.properties
    gora/branches/GORA_94/pom.xml

Modified: gora/branches/GORA_94/gora-accumulo/pom.xml
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-accumulo/pom.xml?rev=1579741&r1=1579740&r2=1579741&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-accumulo/pom.xml (original)
+++ gora/branches/GORA_94/gora-accumulo/pom.xml Thu Mar 20 21:13:33 2014
@@ -1,22 +1,17 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-        http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-    -->
-    
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
+		contributor license agreements. See the NOTICE file distributed with this 
+		work for additional information regarding copyright ownership. The ASF licenses 
+		this file to You under the Apache License, Version 2.0 (the "License"); you 
+		may not use this file except in compliance with the License. You may obtain 
+		a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
+		required by applicable law or agreed to in writing, software distributed 
+		under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+		OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+		the specific language governing permissions and limitations under the License. -->
+
 	<modelVersion>4.0.0</modelVersion>
 
 	<parent>
@@ -114,12 +109,12 @@
         <dependency>
            <groupId>org.apache.accumulo</groupId>
            <artifactId>accumulo-core</artifactId>
-           <version>1.4.0</version>
+           <version>1.5.1</version>
         </dependency>
 
         <!-- Hadoop Dependencies -->
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
+            <groupId>org.apache.avro</groupId>
             <artifactId>avro</artifactId>
         </dependency>
 
@@ -145,6 +140,12 @@
             <artifactId>hadoop-test</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+           
     </dependencies>
 
 </project>

Modified: gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java?rev=1579741&r1=1579740&r2=1579741&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java
(original)
+++ gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java
Thu Mar 20 21:13:33 2014
@@ -33,6 +33,7 @@ public class BinaryEncoder implements En
   
   public byte[] encodeShort(short s, byte ret[]) {
     try {
+      @SuppressWarnings("resource")
       DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
       dos.writeShort(s);
       return ret;
@@ -57,6 +58,7 @@ public class BinaryEncoder implements En
   
   public byte[] encodeInt(int i, byte ret[]) {
     try {
+      @SuppressWarnings("resource")
       DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
       dos.writeInt(i);
       return ret;
@@ -81,6 +83,7 @@ public class BinaryEncoder implements En
   
   public byte[] encodeLong(long l, byte ret[]) {
     try {
+      @SuppressWarnings("resource")
       DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
       dos.writeLong(l);
       return ret;
@@ -106,6 +109,7 @@ public class BinaryEncoder implements En
   public byte[] encodeDouble(double d, byte[] ret) {
     try {
       long l = Double.doubleToRawLongBits(d);
+      @SuppressWarnings("resource")
       DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
       dos.writeLong(l);
       return ret;
@@ -131,6 +135,7 @@ public class BinaryEncoder implements En
   public byte[] encodeFloat(float f, byte[] ret) {
     try {
       int i = Float.floatToRawIntBits(f);
+      @SuppressWarnings("resource")
       DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
       dos.writeInt(i);
       return ret;
@@ -177,6 +182,7 @@ public class BinaryEncoder implements En
   
   public byte[] encodeBoolean(boolean b, byte[] ret) {
     try {
+      @SuppressWarnings("resource")
       DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
       dos.writeBoolean(b);
       return ret;

Modified: gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java?rev=1579741&r1=1579740&r2=1579741&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java
(original)
+++ gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java
Thu Mar 20 21:13:33 2014
@@ -75,7 +75,7 @@ public class AccumuloResult<K,T extends 
     
     Iterator<Entry<Key,Value>> nextRow = iterator.next();
     ByteSequence row = getDataStore().populate(nextRow, persistent);
-    key = (K) ((AccumuloStore) dataStore).fromBytes(getKeyClass(), row.toArray());
+    key = (K) ((AccumuloStore<K, T>) dataStore).fromBytes(getKeyClass(), row.toArray());
     
     return true;
   }

Modified: gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java?rev=1579741&r1=1579740&r2=1579741&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
(original)
+++ gora/branches/GORA_94/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
Thu Mar 20 21:13:33 2014
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -38,6 +39,7 @@ import org.apache.accumulo.core.Constant
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -54,6 +56,8 @@ import org.apache.accumulo.core.client.i
 import org.apache.accumulo.core.client.mock.MockConnector;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.mock.MockTabletLocator;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -64,27 +68,27 @@ import org.apache.accumulo.core.iterator
 import org.apache.accumulo.core.iterators.user.TimestampFilter;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.apache.avro.generic.GenericArray;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
+import org.apache.gora.accumulo.encoders.BinaryEncoder;
 import org.apache.gora.accumulo.encoders.Encoder;
 import org.apache.gora.accumulo.query.AccumuloQuery;
 import org.apache.gora.accumulo.query.AccumuloResult;
-import org.apache.gora.persistency.ListGenericArray;
-import org.apache.gora.persistency.State;
-import org.apache.gora.persistency.StateManager;
-import org.apache.gora.persistency.StatefulHashMap;
-import org.apache.gora.persistency.StatefulMap;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
@@ -105,7 +109,7 @@ import org.w3c.dom.NodeList;
  * 
  */
 public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T>
{
-  
+
   protected static final String MOCK_PROPERTY = "accumulo.mock";
   protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance";
   protected static final String ZOOKEEPERS_NAME_PROPERTY = "accumulo.zookeepers";
@@ -116,36 +120,71 @@ public class AccumuloStore<K,T extends P
   private Connector conn;
   private BatchWriter batchWriter;
   private AccumuloMapping mapping;
-  private AuthInfo authInfo;
+  private TCredentials credentials;
   private Encoder encoder;
-  
+
   public static final Logger LOG = LoggerFactory.getLogger(AccumuloStore.class);
-  
-  public Object fromBytes(Schema schema, byte data[]) {
-    return fromBytes(encoder, schema, data);
+
+  public Object fromBytes(Schema schema, byte data[]) throws GoraException {
+    Schema fromSchema = null;
+    if (schema.getType() == Type.UNION) {
+      try {
+        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
+        int unionIndex = decoder.readIndex();
+        List<Schema> possibleTypes = schema.getTypes();
+        fromSchema = possibleTypes.get(unionIndex);
+        Schema effectiveSchema = possibleTypes.get(unionIndex);
+        if (effectiveSchema.getType() == Type.NULL) {
+          decoder.readNull();
+          return null;
+        } else {
+          data = decoder.readBytes(null).array();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        throw new GoraException("Error decoding union type: ", e);
+      }
+    } else {
+      fromSchema = schema;
+    }
+    return fromBytes(encoder, fromSchema, data);
   }
 
   public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) {
     switch (schema.getType()) {
-      case BOOLEAN:
-        return encoder.decodeBoolean(data);
-      case DOUBLE:
-        return encoder.decodeDouble(data);
-      case FLOAT:
-        return encoder.decodeFloat(data);
-      case INT:
-        return encoder.decodeInt(data);
-      case LONG:
-        return encoder.decodeLong(data);
-      case STRING:
-        return new Utf8(data);
-      case BYTES:
-        return ByteBuffer.wrap(data);
-      case ENUM:
-        return AvroUtils.getEnumValue(schema, encoder.decodeInt(data));
+    case BOOLEAN:
+      return encoder.decodeBoolean(data);
+    case DOUBLE:
+      return encoder.decodeDouble(data);
+    case FLOAT:
+      return encoder.decodeFloat(data);
+    case INT:
+      return encoder.decodeInt(data);
+    case LONG:
+      return encoder.decodeLong(data);
+    case STRING:
+      return new Utf8(data);
+    case BYTES:
+      return ByteBuffer.wrap(data);
+    case ENUM:
+      return AvroUtils.getEnumValue(schema, encoder.decodeInt(data));
+    case ARRAY:
+      break;
+    case FIXED:
+      break;
+    case MAP:
+      break;
+    case NULL:
+      break;
+    case RECORD:
+      break;
+    case UNION:
+      break;
+    default:
+      break;
     }
     throw new IllegalArgumentException("Unknown type " + schema.getType());
-    
+
   }
 
   public K fromBytes(Class<K> clazz, byte[] val) {
@@ -174,7 +213,7 @@ public class AccumuloStore<K,T extends P
       } else if (clazz.equals(Utf8.class)) {
         return (K) new Utf8(val);
       }
-      
+
       throw new IllegalArgumentException("Unknown type " + clazz.getName());
     } catch (IOException ioe) {
       throw new RuntimeException(ioe);
@@ -190,17 +229,67 @@ public class AccumuloStore<K,T extends P
     return b;
   }
 
+  public byte[] toBytes(Schema toSchema, Object o) {
+    if (toSchema != null && toSchema.getType() == Type.UNION) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      org.apache.avro.io.BinaryEncoder avroEncoder = EncoderFactory.get().binaryEncoder(baos,
null);
+      int unionIndex = 0;
+      try {
+        if (o == null) {
+          unionIndex = firstNullSchemaTypeIndex(toSchema);
+          avroEncoder.writeIndex(unionIndex);
+          avroEncoder.writeNull();
+        } else {
+          unionIndex = firstNotNullSchemaTypeIndex(toSchema);
+          avroEncoder.writeIndex(unionIndex);
+          avroEncoder.writeBytes(toBytes(o));
+        }
+        avroEncoder.flush();
+        return baos.toByteArray();
+      } catch (IOException e) {
+        e.printStackTrace();
+        return toBytes(o);
+      }
+    } else {     
+      return toBytes(o);
+    }
+  }
+
+  private int firstNullSchemaTypeIndex(Schema toSchema) {
+    List<Schema> possibleTypes = toSchema.getTypes();
+    int unionIndex = 0;
+    for (int i = 0; i < possibleTypes.size(); i++ ) {
+      Type pType = possibleTypes.get(i).getType();
+      if (pType == Type.NULL) { // FIXME HUGE kludge to pass tests
+        unionIndex = i; break;
+      }
+    }
+    return unionIndex;
+  }
+
+  private int firstNotNullSchemaTypeIndex(Schema toSchema) {
+    List<Schema> possibleTypes = toSchema.getTypes();
+    int unionIndex = 0;
+    for (int i = 0; i < possibleTypes.size(); i++ ) {
+      Type pType = possibleTypes.get(i).getType();
+      if (pType != Type.NULL) { // FIXME HUGE kludge to pass tests
+        unionIndex = i; break;
+      }
+    }
+    return unionIndex;
+  }
+
   public byte[] toBytes(Object o) {
     return toBytes(encoder, o);
   }
-  
+
   public static byte[] toBytes(Encoder encoder, Object o) {
-    
+
     try {
       if (o instanceof String) {
         return ((String) o).getBytes("UTF-8");
       } else if (o instanceof Utf8) {
-        return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) o).getLength());
+        return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) o).getByteLength());
       } else if (o instanceof ByteBuffer) {
         return copyIfNeeded(((ByteBuffer) o).array(), ((ByteBuffer) o).arrayOffset() + ((ByteBuffer)
o).position(), ((ByteBuffer) o).remaining());
       } else if (o instanceof Long) {
@@ -218,19 +307,23 @@ public class AccumuloStore<K,T extends P
       } else if (o instanceof Double) {
         return encoder.encodeDouble((Double) o);
       } else if (o instanceof Enum) {
-        return encoder.encodeInt(((Enum) o).ordinal());
+        return encoder.encodeInt(((Enum<?>) o).ordinal());
       }
     } catch (IOException ioe) {
       throw new RuntimeException(ioe);
     }
-    
+
     throw new IllegalArgumentException("Uknown type " + o.getClass().getName());
   }
 
   private BatchWriter getBatchWriter() throws IOException {
     if (batchWriter == null)
       try {
-        batchWriter = conn.createBatchWriter(mapping.tableName, 10000000, 60000l, 4);
+        BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
+        batchWriterConfig.setMaxMemory(10000000);
+        batchWriterConfig.setMaxLatency(60000l, TimeUnit.MILLISECONDS);
+        batchWriterConfig.setMaxWriteThreads(4);
+        batchWriter = conn.createBatchWriter(mapping.tableName, batchWriterConfig);
       } catch (TableNotFoundException e) {
         throw new IOException(e);
       }
@@ -241,16 +334,16 @@ public class AccumuloStore<K,T extends P
   public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties
properties) {
     try{
       super.initialize(keyClass, persistentClass, properties);
-  
+
       String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null);
       String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
       String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null);
       String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY,
null);
-      
+
       mapping = readMapping(mappingFile);
-  
+
       if (mapping.encoder == null || mapping.encoder.equals("")) {
-        encoder = new org.apache.gora.accumulo.encoders.BinaryEncoder();
+        encoder = new BinaryEncoder();
       } else {
         try {
           encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
@@ -262,17 +355,23 @@ public class AccumuloStore<K,T extends P
           throw new IOException(e);
         }
       }
-  
+
       try {
+        AuthenticationToken token =  new PasswordToken(password);
         if (mock == null || !mock.equals("true")) {
           String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY,
null);
           String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY,
null);
-          conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, password);
-          authInfo = new AuthInfo(user, ByteBuffer.wrap(password.getBytes()), conn.getInstance().getInstanceID());
+          credentials = new TCredentials(user, 
+              "org.apache.accumulo.core.client.security.tokens.PasswordToken", 
+              ByteBuffer.wrap(password.getBytes()), instance);
+          conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, token);
         } else {
-          conn = new MockInstance().getConnector(user, password);
+          conn = new MockInstance().getConnector(user, new PasswordToken(password));
+          credentials = new TCredentials(user, 
+              "org.apache.accumulo.core.client.security.tokens.PasswordToken", 
+              ByteBuffer.wrap(password.getBytes()), conn.getInstance().getInstanceID());
         }
-  
+
         if (autoCreateSchema)
           createSchema();
       } catch (AccumuloException e) {
@@ -284,27 +383,27 @@ public class AccumuloStore<K,T extends P
       LOG.error(e.getMessage(), e);
     }
   }
-  
+
   protected AccumuloMapping readMapping(String filename) throws IOException {
     try {
-      
+
       AccumuloMapping mapping = new AccumuloMapping();
 
       DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder();
       Document dom = db.parse(getClass().getClassLoader().getResourceAsStream(filename));
-      
+
       Element root = dom.getDocumentElement();
-      
+
       NodeList nl = root.getElementsByTagName("class");
       for (int i = 0; i < nl.getLength(); i++) {
-        
+
         Element classElement = (Element) nl.item(i);
         if (classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName())
             && classElement.getAttribute("name").equals(persistentClass.getCanonicalName()))
{
 
           mapping.tableName = getSchemaName(classElement.getAttribute("table"), persistentClass);
           mapping.encoder = classElement.getAttribute("encoder");
-          
+
           NodeList fields = classElement.getElementsByTagName("field");
           for (int j = 0; j < fields.getLength(); j++) {
             Element fieldElement = (Element) fields.item(j);
@@ -324,9 +423,9 @@ public class AccumuloStore<K,T extends P
       }
 
       if (mapping.tableName == null) {
-        throw new GoraException("Please define the gora to accumulo mapping in " + filename
+ " for " + persistentClass.getCanonicalName());
+        throw new GoraException("Please define the accumulo 'table' name mapping in " + filename
+ " for " + persistentClass.getCanonicalName());
       }
-      
+
       nl = root.getElementsByTagName("table");
       for (int i = 0; i < nl.getLength(); i++) {
         Element tableElement = (Element) nl.item(i);
@@ -347,12 +446,12 @@ public class AccumuloStore<K,T extends P
     }
 
   }
-  
+
   @Override
   public String getSchemaName() {
     return mapping.tableName;
   }
-  
+
   @Override
   public void createSchema() {
     try {
@@ -394,20 +493,30 @@ public class AccumuloStore<K,T extends P
 
   public ByteSequence populate(Iterator<Entry<Key,Value>> iter, T persistent)
throws IOException {
     ByteSequence row = null;
-    
-    Map currentMap = null;
-    ArrayList currentArray = null;
+
+    Map<Utf8, Object> currentMap = null;
+    List currentArray = null;
     Text currentFam = null;
     int currentPos = 0;
     Schema currentSchema = null;
     Field currentField = null;
 
+    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new byte[0], null);
+
     while (iter.hasNext()) {
       Entry<Key,Value> entry = iter.next();
-      
+
+      if (row == null) {
+        row = entry.getKey().getRowData();
+      }
+      byte[] val = entry.getValue().get();
+
+      Field field = fieldMap.get(getFieldName(entry));
+
       if (currentMap != null) {
         if (currentFam.equals(entry.getKey().getColumnFamily())) {
-          currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), fromBytes(currentSchema,
entry.getValue().get()));
+          currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), 
+              fromBytes(currentSchema, entry.getValue().get()));
           continue;
         } else {
           persistent.put(currentPos, currentMap);
@@ -418,57 +527,69 @@ public class AccumuloStore<K,T extends P
           currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
           continue;
         } else {
-          persistent.put(currentPos, new ListGenericArray<T>(currentField.schema(),
currentArray));
+          persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(),
currentArray));
           currentArray = null;
         }
       }
 
-      if (row == null)
-        row = entry.getKey().getRowData();
-      
-      String fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(),
entry.getKey().getColumnQualifier()));
-      if (fieldName == null)
-        fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(),
null));
-
-      Field field = fieldMap.get(fieldName);
-
       switch (field.schema().getType()) {
-        case MAP:
-          currentMap = new StatefulHashMap();
+      case MAP:  // first entry only. Next are handled above on the next loop
+        currentMap = new DirtyMapWrapper<Utf8, Object>(new HashMap<Utf8, Object>());
+        currentPos = field.pos();
+        currentFam = entry.getKey().getColumnFamily();
+        currentSchema = field.schema().getValueType();
+
+        currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), 
+            fromBytes(currentSchema, entry.getValue().get()));
+        break;
+      case ARRAY:
+        currentArray = new DirtyListWrapper<Object>(new ArrayList<Object>());
+        currentPos = field.pos();
+        currentFam = entry.getKey().getColumnFamily();
+        currentSchema = field.schema().getElementType();
+        currentField = field;
+
+        currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
+
+        break;
+      case UNION:// default value of null acts like union with null
+        Schema effectiveSchema = field.schema().getTypes()
+        .get(firstNotNullSchemaTypeIndex(field.schema()));
+        // map and array were coded without union index so need to be read the same way
+        if (effectiveSchema.getType() == Type.ARRAY) {
+          currentArray = new DirtyListWrapper<Object>(new ArrayList<Object>());
           currentPos = field.pos();
           currentFam = entry.getKey().getColumnFamily();
-          currentSchema = field.schema().getValueType();
-          
-          currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), fromBytes(currentSchema,
entry.getValue().get()));
+          currentSchema = field.schema().getElementType();
+          currentField = field;
 
+          currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
           break;
-        case ARRAY:
-          currentArray = new ArrayList();
+        }
+        else if (effectiveSchema.getType() == Type.MAP) {
+          currentMap = new DirtyMapWrapper<Utf8, Object>(new HashMap<Utf8, Object>());
           currentPos = field.pos();
           currentFam = entry.getKey().getColumnFamily();
-          currentSchema = field.schema().getElementType();
-          currentField = field;
-          
-          currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
+          currentSchema = effectiveSchema.getValueType();
 
+          currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), 
+              fromBytes(currentSchema, entry.getValue().get()));
           break;
-        case RECORD:
-        case UNION:
-          SpecificDatumReader reader = new SpecificDatumReader(field.schema());
-          byte[] val = entry.getValue().get();
-          // TODO reuse decoder
-          BinaryDecoder decoder = DecoderFactory.defaultFactory().createBinaryDecoder(val,
null);
-          persistent.put(field.pos(), reader.read(null, decoder));
-          break;
-        default:
-          persistent.put(field.pos(), fromBytes(field.schema(), entry.getValue().get()));
+        }
+        // continue like a regular top-level union
+      case RECORD:
+        SpecificDatumReader<?> reader = new SpecificDatumReader<Schema>(field.schema());
+        persistent.put(field.pos(), reader.read(null, DecoderFactory.get().binaryDecoder(val,
decoder)));
+        break;
+      default:
+        persistent.put(field.pos(), fromBytes(field.schema(), entry.getValue().get()));
       }
     }
-    
+
     if (currentMap != null) {
       persistent.put(currentPos, currentMap);
     } else if (currentArray != null) {
-      persistent.put(currentPos, new ListGenericArray<T>(currentField.schema(), currentArray));
+      persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray));
     }
 
     persistent.clearDirty();
@@ -476,14 +597,32 @@ public class AccumuloStore<K,T extends P
     return row;
   }
 
+  /**
+   * Retrieve field name from entry.
+   * @param entry The Key-Value entry
+   * @return String The field name
+   */
+  private String getFieldName(Entry<Key, Value> entry) {
+    String fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(),

+        entry.getKey().getColumnQualifier()));
+    if (fieldName == null) {
+      fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(),
null));
+    }
+    return fieldName;
+  }
+
   private void setFetchColumns(Scanner scanner, String fields[]) {
     fields = getFieldsToQuery(fields);
     for (String field : fields) {
       Pair<Text,Text> col = mapping.fieldMap.get(field);
-      if (col.getSecond() == null) {
-        scanner.fetchColumnFamily(col.getFirst());
+      if (col != null) {
+        if (col.getSecond() == null) {
+          scanner.fetchColumnFamily(col.getFirst());
+        } else {
+          scanner.fetchColumn(col.getFirst(), col.getSecond());
+        }
       } else {
-        scanner.fetchColumn(col.getFirst(), col.getSecond());
+        LOG.error("Mapping not found for field: " + field);
       }
     }
   }
@@ -494,10 +633,10 @@ public class AccumuloStore<K,T extends P
       // TODO make isolated scanner optional?
       Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Constants.NO_AUTHS));
       Range rowRange = new Range(new Text(toBytes(key)));
-      
+
       scanner.setRange(rowRange);
       setFetchColumns(scanner, fields);
-      
+
       T persistent = newPersistent();
       ByteSequence row = populate(scanner.iterator(), persistent);
       if (row == null)
@@ -511,90 +650,67 @@ public class AccumuloStore<K,T extends P
       return null;
     }
   }
-  
+
   @Override
   public void put(K key, T val) {
 
     try{
       Mutation m = new Mutation(new Text(toBytes(key)));
-      
+
       Schema schema = val.getSchema();
-      StateManager stateManager = val.getStateManager();
-      
-      Iterator<Field> iter = schema.getFields().iterator();
-      
+      List<Field> fields = schema.getFields();
       int count = 0;
-      for (int i = 0; iter.hasNext(); i++) {
-        Field field = iter.next();
-        if (!stateManager.isDirty(val, i)) {
+
+      for (int i = 1; i < fields.size(); i++) {
+        if (!val.isDirty(i)) {
           continue;
         }
-        
-        Object o = val.get(i);
+        Field field = fields.get(i);
+
+        Object o = val.get(field.pos());       
+
         Pair<Text,Text> col = mapping.fieldMap.get(field.name());
 
         if (col == null) {
           throw new GoraException("Please define the gora to accumulo mapping for field "
+ field.name());
         }
 
-  
         switch (field.schema().getType()) {
-          case MAP:
-            if (o instanceof StatefulMap) {
-              StatefulMap map = (StatefulMap) o;
-              Set<?> es = map.states().entrySet();
-              for (Object entry : es) {
-                Object mapKey = ((Entry) entry).getKey();
-                State state = (State) ((Entry) entry).getValue();
-  
-                switch (state) {
-                  case NEW:
-                  case DIRTY:
-                    m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(map.get(mapKey))));
-                    count++;
-                    break;
-                  case DELETED:
-                    m.putDelete(col.getFirst(), new Text(toBytes(mapKey)));
-                    count++;
-                    break;
-                }
-                
-              }
-            } else {
-              Map map = (Map) o;
-              Set<?> es = map.entrySet();
-              for (Object entry : es) {
-                Object mapKey = ((Entry) entry).getKey();
-                Object mapVal = ((Entry) entry).getValue();
-                m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(mapVal)));
-                count++;
-              }
-            }
-            break;
-          case ARRAY:
-            GenericArray array = (GenericArray) o;
-            int j = 0;
-            for (Object item : array) {
-              m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
-              count++;
-            }
+        case MAP:
+          count = putMap(m, count, field.schema().getValueType(), o, col);
+          break;
+        case ARRAY:
+          count = putArray(m, count, o, col);
+          break;
+        case UNION: // default value of null acts like union with null
+          Schema effectiveSchema = field.schema().getTypes()
+          .get(firstNotNullSchemaTypeIndex(field.schema()));
+          // map and array need to compute qualifier
+          if (effectiveSchema.getType() == Type.ARRAY) {
+            count = putArray(m, count, o, col);
             break;
-          case RECORD:
-          case UNION:
-            SpecificDatumWriter writer = new SpecificDatumWriter(field.schema());
-            ByteArrayOutputStream os = new ByteArrayOutputStream();
-            BinaryEncoder encoder = new BinaryEncoder(os);
-            writer.write(o, encoder);
-            encoder.flush();
-            m.put(col.getFirst(), col.getSecond(), new Value(os.toByteArray()));
+          }
+          else if (effectiveSchema.getType() == Type.MAP) {
+            count = putMap(m, count, effectiveSchema.getValueType(), o, col);
             break;
-          default:
-            m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
-            count++;
+          }
+          // continue like a regular top-level union
+        case RECORD:
+          SpecificDatumWriter<Object> writer = new SpecificDatumWriter<Object>(field.schema());
+          ByteArrayOutputStream os = new ByteArrayOutputStream();
+          org.apache.avro.io.BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os,
null);
+          writer.write(o, encoder);
+          encoder.flush();
+          m.put(col.getFirst(), col.getSecond(), new Value(os.toByteArray()));
+          count++;
+          break;
+        default:
+          m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
+          count++;
         }
-  
+
       }
-      
+
       if (count > 0)
         try {
           getBatchWriter().addMutation(m);
@@ -605,7 +721,32 @@ public class AccumuloStore<K,T extends P
       LOG.error(e.getMessage(), e);
     }
   }
-  
+
+  private int putMap(Mutation m, int count, Schema valueType, Object o, Pair<Text, Text>
col) throws GoraException {
+    Set<?> es = ((Map<?, ?>)o).entrySet();
+    for (Object entry : es) {
+      Object mapKey = ((Entry<?, ?>) entry).getKey();
+      Object mapVal = ((Entry<?, ?>) entry).getValue();                  
+      if ((o instanceof DirtyMapWrapper && ((DirtyMapWrapper<?, ?>)o).isDirty())
+          || !(o instanceof DirtyMapWrapper)) { //mapVal instanceof Dirtyable &&
((Dirtyable)mapVal).isDirty()) {
+        m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(valueType, mapVal)));
+        count++;
+      }
+      // TODO map value deletion
+    }
+    return count;
+  }
+
+  private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col) {
+    List<?> array = (List<?>) o;  // both GenericArray and DirtyListWrapper
+    int j = 0;
+    for (Object item : array) {
+      m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
+      count++;
+    }
+    return count;
+  }
+
   @Override
   public boolean delete(K key) {
     Query<K,T> q = newQuery();
@@ -620,7 +761,7 @@ public class AccumuloStore<K,T extends P
       // add iterator that drops values on the server side
       scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, SortedKeyIterator.class));
       RowIterator iterator = new RowIterator(scanner.iterator());
-      
+
       long count = 0;
 
       while (iterator.hasNext()) {
@@ -637,7 +778,7 @@ public class AccumuloStore<K,T extends P
         getBatchWriter().addMutation(m);
         count++;
       }
-      
+
       return count;
     } catch (TableNotFoundException e) {
       // TODO return 0?
@@ -655,34 +796,34 @@ public class AccumuloStore<K,T extends P
   private Range createRange(Query<K,T> query) {
     Text startRow = null;
     Text endRow = null;
-    
+
     if (query.getStartKey() != null)
       startRow = new Text(toBytes(query.getStartKey()));
-    
+
     if (query.getEndKey() != null)
       endRow = new Text(toBytes(query.getEndKey()));
-    
+
     return new Range(startRow, true, endRow, true);
-    
+
   }
-  
+
   private Scanner createScanner(Query<K,T> query) throws TableNotFoundException {
     // TODO make isolated scanner optional?
     Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Constants.NO_AUTHS));
     setFetchColumns(scanner, query.getFields());
-    
+
     scanner.setRange(createRange(query));
-    
+
     if (query.getStartTime() != -1 || query.getEndTime() != -1) {
       IteratorSetting is = new IteratorSetting(30, TimestampFilter.class);
       if (query.getStartTime() != -1)
         TimestampFilter.setStart(is, query.getStartTime(), true);
       if (query.getEndTime() != -1)
         TimestampFilter.setEnd(is, query.getEndTime(), true);
-      
+
       scanner.addScanIterator(is);
     }
-    
+
     return scanner;
   }
 
@@ -697,7 +838,7 @@ public class AccumuloStore<K,T extends P
       return null;
     } 
   }
-  
+
   @Override
   public Query<K,T> newQuery() {
     return new AccumuloQuery<K,T>(this);
@@ -706,14 +847,14 @@ public class AccumuloStore<K,T extends P
   Text pad(Text key, int bytes) {
     if (key.getLength() < bytes)
       key = new Text(key);
-    
+
     while (key.getLength() < bytes) {
       key.append(new byte[] {0}, 0, 1);
     }
-    
+
     return key;
   }
-  
+
   @Override
   public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws
IOException {
     try {
@@ -721,12 +862,12 @@ public class AccumuloStore<K,T extends P
       if (conn instanceof MockConnector)
         tl = new MockTabletLocator();
       else
-        tl = TabletLocator.getInstance(conn.getInstance(), authInfo, new Text(Tables.getTableId(conn.getInstance(),
mapping.tableName)));
-      
+        tl = TabletLocator.getInstance(conn.getInstance(), new Text(Tables.getTableId(conn.getInstance(),
mapping.tableName)));
+
       Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-      
+
       tl.invalidateCache();
-      while (tl.binRanges(Collections.singletonList(createRange(query)), binnedRanges).size()
> 0) {
+      while (tl.binRanges(Collections.singletonList(createRange(query)), binnedRanges, credentials).size()
> 0) {
         // TODO log?
         if (!Tables.exists(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)))
           throw new TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName));
@@ -735,19 +876,19 @@ public class AccumuloStore<K,T extends P
         UtilWaitThread.sleep(100);
         tl.invalidateCache();
       }
-      
+
       List<PartitionQuery<K,T>> ret = new ArrayList<PartitionQuery<K,T>>();
-      
+
       Text startRow = null;
       Text endRow = null;
       if (query.getStartKey() != null)
         startRow = new Text(toBytes(query.getStartKey()));
       if (query.getEndKey() != null)
         endRow = new Text(toBytes(query.getEndKey()));
-     
+
       //hadoop expects hostnames, accumulo keeps track of IPs... so need to convert
       HashMap<String,String> hostNameCache = new HashMap<String,String>();
- 
+
       for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet())
{
         String ip = entry.getKey().split(":", 2)[0];
         String location = hostNameCache.get(ip);
@@ -759,7 +900,7 @@ public class AccumuloStore<K,T extends P
 
         Map<KeyExtent,List<Range>> tablets = entry.getValue();
         for (KeyExtent ke : tablets.keySet()) {
-          
+
           K startKey = null;
           if (startRow == null || !ke.contains(startRow)) {
             if (ke.getPrevEndRow() != null) {
@@ -768,7 +909,7 @@ public class AccumuloStore<K,T extends P
           } else {
             startKey = fromBytes(getKeyClass(), TextUtil.getBytes(startRow));
           }
-          
+
           K endKey = null;
           if (endRow == null || !ke.contains(endRow)) {
             if (ke.getEndRow() != null)
@@ -776,13 +917,13 @@ public class AccumuloStore<K,T extends P
           } else {
             endKey = fromBytes(getKeyClass(), TextUtil.getBytes(endRow));
           }
-          
-          PartitionQueryImpl pqi = new PartitionQueryImpl<K,T>(query, startKey, endKey,
new String[] {location});
+
+          PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<K,T>(query, startKey,
endKey, new String[] {location});
           pqi.setConf(getConf());
           ret.add(pqi);
         }
       }
-      
+
       return ret;
     } catch (TableNotFoundException e) {
       throw new IOException(e);
@@ -791,11 +932,11 @@ public class AccumuloStore<K,T extends P
     } catch (AccumuloSecurityException e) {
       throw new IOException(e);
     }
-    
+
   }
-  
+
   static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] er) {
-    
+
     if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
       throw new UnsupportedOperationException();
     } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
@@ -815,19 +956,20 @@ public class AccumuloStore<K,T extends P
     } else if (clazz.equals(Utf8.class)) {
       return fromBytes(encoder, clazz, er);
     }
-    
+
     throw new IllegalArgumentException("Unknown type " + clazz.getName());
   }
 
 
-  
+
   /**
    * @param keyClass
    * @param bytes
    * @return
    */
+  @SuppressWarnings("unchecked")
   static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) {
-    
+
     if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
       return (K) Byte.valueOf(encoder.followingKey(1, per)[0]);
     } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {

Modified: gora/branches/GORA_94/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml?rev=1579741&r1=1579740&r2=1579741&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml (original)
+++ gora/branches/GORA_94/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml Thu Mar
20 21:13:33 2014
@@ -47,6 +47,7 @@
     <field name="content" family="content" qualifier="c"/>
     <field name="parsedContent" family="parsedContent"/>
     <field name="outlinks" family="outlinks"/>
+    <field name="headers" family="headers"/>
     <field name="metadata" family="common" qualifier="metadata"/>
   </class>
 

Modified: gora/branches/GORA_94/gora-accumulo/src/test/resources/gora.properties
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-accumulo/src/test/resources/gora.properties?rev=1579741&r1=1579740&r2=1579741&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-accumulo/src/test/resources/gora.properties (original)
+++ gora/branches/GORA_94/gora-accumulo/src/test/resources/gora.properties Thu Mar 20 21:13:33
2014
@@ -18,4 +18,4 @@ gora.datastore.accumulo.mock=true
 gora.datastore.accumulo.instance=a14
 gora.datastore.accumulo.zookeepers=localhost
 gora.datastore.accumulo.user=root
-gora.datastore.accumulo.password=secret
\ No newline at end of file
+gora.datastore.accumulo.password=
\ No newline at end of file

Modified: gora/branches/GORA_94/pom.xml
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/pom.xml?rev=1579741&r1=1579740&r2=1579741&view=diff
==============================================================================
--- gora/branches/GORA_94/pom.xml (original)
+++ gora/branches/GORA_94/pom.xml Thu Mar 20 21:13:33 2014
@@ -576,7 +576,7 @@
         <module>gora-compiler-cli</module>
         <module>gora-core</module>
         <module>gora-hbase</module>
-        <!--module>gora-accumulo</module-->
+        <module>gora-accumulo</module>
         <module>gora-cassandra</module>
         <!-- module>gora-solr</module-->
         <!--module>gora-dynamodb</module-->



Mime
View raw message