gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject [05/50] [abbrv] GORA-321. Merge GORA_94 into Gora trunk
Date Wed, 04 Jun 2014 16:36:22 GMT
http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java
----------------------------------------------------------------------
diff --git a/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java b/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java
index 221a4cb..46a3386 100644
--- a/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java
+++ b/gora-solr/src/main/java/org/apache/gora/solr/store/SolrStore.java
@@ -17,13 +17,19 @@ package org.apache.gora.solr.store;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
-import org.apache.gora.persistency.StateManager;
+import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
@@ -53,8 +59,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
-    
-  private static final Logger LOG = LoggerFactory.getLogger( SolrStore.class );
+
+  private static final Logger LOG = LoggerFactory.getLogger(SolrStore.class);
 
   protected static final String DEFAULT_MAPPING_FILE = "gora-solr-mapping.xml";
 
@@ -63,15 +69,15 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T>
   protected static final String SOLR_CONFIG_PROPERTY = "solr.config";
 
   protected static final String SOLR_SCHEMA_PROPERTY = "solr.schema";
-    
+
   protected static final String SOLR_BATCH_SIZE_PROPERTY = "solr.batchSize";
-    
-  //protected static final String SOLR_SOLRJSERVER_IMPL = "solr.solrjserver";
+
+  // protected static final String SOLR_SOLRJSERVER_IMPL = "solr.solrjserver";
 
   protected static final String SOLR_COMMIT_WITHIN_PROPERTY = "solr.commitWithin";
 
   protected static final String SOLR_RESULTS_SIZE_PROPERTY = "solr.resultsSize";
-    
+
   protected static final int DEFAULT_BATCH_SIZE = 100;
 
   protected static final int DEFAULT_COMMIT_WITHIN = 1000;
@@ -92,86 +98,116 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T>
 
   private int resultsSize = DEFAULT_RESULTS_SIZE;
 
-  @Override
-  public void initialize( Class<K> keyClass, Class<T> persistentClass, Properties properties ) {
-    super.initialize( keyClass, persistentClass, properties );
+  /**
+   * Default schema index with value "0" used when AVRO Union data types are
+   * stored
+   */
+  public static int DEFAULT_UNION_SCHEMA = 0;
+
+  /*
+   * Create a threadlocal map for the datum readers and writers, because they
+   * are not thread safe, at least not before Avro 1.4.0 (See AVRO-650). When
+   * they are thread safe, it is possible to maintain a single reader and writer
+   * pair for every schema, instead of one for every thread.
+   */
+
+  public static final ConcurrentHashMap<String, SpecificDatumReader<?>> readerMap = new ConcurrentHashMap<String, SpecificDatumReader<?>>();
+
+  public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> writerMap = new ConcurrentHashMap<String, SpecificDatumWriter<?>>();
 
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass,
+      Properties properties) {
+    super.initialize(keyClass, persistentClass, properties);
     try {
-      String mappingFile = DataStoreFactory.getMappingFile( properties, this, DEFAULT_MAPPING_FILE );
-      mapping = readMapping( mappingFile );
-    }
-    catch ( IOException e ) {
-      LOG.error( e.getMessage() );
-      LOG.error( e.getStackTrace().toString() );
+      String mappingFile = DataStoreFactory.getMappingFile(properties, this,
+          DEFAULT_MAPPING_FILE);
+      mapping = readMapping(mappingFile);
+    } catch (IOException e) {
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
     }
 
-    solrServerUrl = DataStoreFactory.findProperty( properties, this, SOLR_URL_PROPERTY, null );
-    solrConfig = DataStoreFactory.findProperty( properties, this, SOLR_CONFIG_PROPERTY, null );
-    solrSchema = DataStoreFactory.findProperty( properties, this, SOLR_SCHEMA_PROPERTY, null );
-    LOG.info( "Using Solr server at " + solrServerUrl );
-    adminServer = new HttpSolrServer( solrServerUrl );
-    server = new HttpSolrServer( solrServerUrl + "/" + mapping.getCoreName() );
-    if ( autoCreateSchema ) {
+    solrServerUrl = DataStoreFactory.findProperty(properties, this,
+        SOLR_URL_PROPERTY, null);
+    solrConfig = DataStoreFactory.findProperty(properties, this,
+        SOLR_CONFIG_PROPERTY, null);
+    solrSchema = DataStoreFactory.findProperty(properties, this,
+        SOLR_SCHEMA_PROPERTY, null);
+    LOG.info("Using Solr server at " + solrServerUrl);
+    adminServer = new HttpSolrServer(solrServerUrl);
+    server = new HttpSolrServer(solrServerUrl + "/" + mapping.getCoreName());
+    if (autoCreateSchema) {
       createSchema();
     }
-    String batchSizeString = DataStoreFactory.findProperty( properties, this, SOLR_BATCH_SIZE_PROPERTY, null );
-    if ( batchSizeString != null ) {
+    String batchSizeString = DataStoreFactory.findProperty(properties, this,
+        SOLR_BATCH_SIZE_PROPERTY, null);
+    if (batchSizeString != null) {
       try {
-        batchSize = Integer.parseInt( batchSizeString );
-      } catch ( NumberFormatException nfe ) {
-        LOG.warn( "Invalid batch size '" + batchSizeString + "', using default " + DEFAULT_BATCH_SIZE );
+        batchSize = Integer.parseInt(batchSizeString);
+      } catch (NumberFormatException nfe) {
+        LOG.warn("Invalid batch size '" + batchSizeString + "', using default "
+            + DEFAULT_BATCH_SIZE);
       }
     }
-    batch = new ArrayList<SolrInputDocument>( batchSize );
-    String commitWithinString = DataStoreFactory.findProperty( properties, this, SOLR_COMMIT_WITHIN_PROPERTY, null );
-    if ( commitWithinString != null ) {
+    batch = new ArrayList<SolrInputDocument>(batchSize);
+    String commitWithinString = DataStoreFactory.findProperty(properties, this,
+        SOLR_COMMIT_WITHIN_PROPERTY, null);
+    if (commitWithinString != null) {
       try {
-        commitWithin = Integer.parseInt( commitWithinString );
-      } catch ( NumberFormatException nfe ) {
-        LOG.warn( "Invalid commit within '" + commitWithinString + "', using default " + DEFAULT_COMMIT_WITHIN );
+        commitWithin = Integer.parseInt(commitWithinString);
+      } catch (NumberFormatException nfe) {
+        LOG.warn("Invalid commit within '" + commitWithinString
+            + "', using default " + DEFAULT_COMMIT_WITHIN);
       }
     }
-    String resultsSizeString = DataStoreFactory.findProperty( properties, this, SOLR_RESULTS_SIZE_PROPERTY, null );
-    if ( resultsSizeString != null ) {
+    String resultsSizeString = DataStoreFactory.findProperty(properties, this,
+        SOLR_RESULTS_SIZE_PROPERTY, null);
+    if (resultsSizeString != null) {
       try {
-        resultsSize = Integer.parseInt( resultsSizeString );
-      } catch ( NumberFormatException nfe ) {
-        LOG.warn( "Invalid results size '" + resultsSizeString + "', using default " + DEFAULT_RESULTS_SIZE );
+        resultsSize = Integer.parseInt(resultsSizeString);
+      } catch (NumberFormatException nfe) {
+        LOG.warn("Invalid results size '" + resultsSizeString
+            + "', using default " + DEFAULT_RESULTS_SIZE);
       }
     }
   }
 
   @SuppressWarnings("unchecked")
-  private SolrMapping readMapping( String filename ) throws IOException {
+  private SolrMapping readMapping(String filename) throws IOException {
     SolrMapping map = new SolrMapping();
     try {
       SAXBuilder builder = new SAXBuilder();
-      Document doc = builder.build( getClass().getClassLoader().getResourceAsStream( filename ) );
+      Document doc = builder.build(getClass().getClassLoader()
+          .getResourceAsStream(filename));
 
-      List<Element> classes = doc.getRootElement().getChildren( "class" );
+      List<Element> classes = doc.getRootElement().getChildren("class");
 
-      for ( Element classElement : classes ) {
-        if ( classElement.getAttributeValue( "keyClass" ).equals( keyClass.getCanonicalName() )
-            && classElement.getAttributeValue( "name" ).equals( persistentClass.getCanonicalName() ) ) {
+      for (Element classElement : classes) {
+        if (classElement.getAttributeValue("keyClass").equals(
+            keyClass.getCanonicalName())
+            && classElement.getAttributeValue("name").equals(
+                persistentClass.getCanonicalName())) {
 
-          String tableName = getSchemaName( classElement.getAttributeValue( "table" ), persistentClass );
-          map.setCoreName( tableName );
+          String tableName = getSchemaName(
+              classElement.getAttributeValue("table"), persistentClass);
+          map.setCoreName(tableName);
 
-          Element primaryKeyEl = classElement.getChild( "primarykey" );
-          map.setPrimaryKey( primaryKeyEl.getAttributeValue( "column" ) );
+          Element primaryKeyEl = classElement.getChild("primarykey");
+          map.setPrimaryKey(primaryKeyEl.getAttributeValue("column"));
 
-          List<Element> fields = classElement.getChildren( "field" );
+          List<Element> fields = classElement.getChildren("field");
 
-          for ( Element field : fields ) {
-            String fieldName = field.getAttributeValue( "name" );
-            String columnName = field.getAttributeValue( "column" );
-            map.addField( fieldName, columnName );
+          for (Element field : fields) {
+            String fieldName = field.getAttributeValue("name");
+            String columnName = field.getAttributeValue("column");
+            map.addField(fieldName, columnName);
           }
           break;
         }
       }
-    } catch ( Exception ex ) {
-      throw new IOException( ex );
+    } catch (Exception ex) {
+      throw new IOException(ex);
     }
 
     return map;
@@ -189,11 +225,11 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T>
   @Override
   public void createSchema() {
     try {
-      if ( !schemaExists() )
-          CoreAdminRequest.createCore( mapping.getCoreName(), mapping.getCoreName(), adminServer, solrConfig,
-          solrSchema );
-    } catch ( Exception e ) {
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      if (!schemaExists())
+        CoreAdminRequest.createCore(mapping.getCoreName(),
+            mapping.getCoreName(), adminServer, solrConfig, solrSchema);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e.getStackTrace().toString());
     }
   }
 
@@ -201,11 +237,11 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T>
   /** Default implementation deletes and recreates the schema*/
   public void truncateSchema() {
     try {
-      server.deleteByQuery( "*:*" );
+      server.deleteByQuery("*:*");
       server.commit();
-    } catch ( Exception e ) {
+    } catch (Exception e) {
       // ignore?
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      LOG.error(e.getMessage(), e.getStackTrace().toString());
     }
   }
 
@@ -213,20 +249,20 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T>
   public void deleteSchema() {
     // XXX should this be only in truncateSchema ???
     try {
-      server.deleteByQuery( "*:*" );
+      server.deleteByQuery("*:*");
       server.commit();
-    } catch ( Exception e ) {
-    // ignore?
-    // LOG.error(e.getMessage());
-    // LOG.error(e.getStackTrace().toString());
+    } catch (Exception e) {
+      // ignore?
+      // LOG.error(e.getMessage());
+      // LOG.error(e.getStackTrace().toString());
     }
     try {
-      CoreAdminRequest.unloadCore( mapping.getCoreName(), adminServer );
-    } catch ( Exception e ) {
-      if ( e.getMessage().contains( "No such core" ) ) {
+      CoreAdminRequest.unloadCore(mapping.getCoreName(), adminServer);
+    } catch (Exception e) {
+      if (e.getMessage().contains("No such core")) {
         return; // it's ok, the core is not there
       } else {
-        LOG.error( e.getMessage(), e.getStackTrace().toString() );
+        LOG.error(e.getMessage(), e.getStackTrace().toString());
       }
     }
   }
@@ -235,239 +271,366 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T>
   public boolean schemaExists() {
     boolean exists = false;
     try {
-      CoreAdminResponse rsp = CoreAdminRequest.getStatus( mapping.getCoreName(), adminServer );
-      exists = rsp.getUptime( mapping.getCoreName() ) != null;
-    } catch ( Exception e ) {
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      CoreAdminResponse rsp = CoreAdminRequest.getStatus(mapping.getCoreName(),
+          adminServer);
+      exists = rsp.getUptime(mapping.getCoreName()) != null;
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e.getStackTrace().toString());
     }
     return exists;
   }
 
-  private static final String toDelimitedString( String[] arr, String sep ) {
-    if ( arr == null || arr.length == 0 ) {
+  private static final String toDelimitedString(String[] arr, String sep) {
+    if (arr == null || arr.length == 0) {
       return "";
     }
     StringBuilder sb = new StringBuilder();
-    for ( int i = 0; i < arr.length; i++ ) {
-      if ( i > 0 )
-        sb.append( sep );
-        sb.append( arr[i] );
+    for (int i = 0; i < arr.length; i++) {
+      if (i > 0)
+        sb.append(sep);
+      sb.append(arr[i]);
     }
     return sb.toString();
   }
 
-  public static String escapeQueryKey( String key ) {
-    if ( key == null ) {
+  public static String escapeQueryKey(String key) {
+    if (key == null) {
       return null;
     }
     StringBuilder sb = new StringBuilder();
-    for ( int i = 0; i < key.length(); i++ ) {
-      char c = key.charAt( i );
-      switch ( c ) {
-        case ':':
-        case '*':
-          sb.append( "\\" + c );
-          break;
-        default:
-        sb.append( c );
+    for (int i = 0; i < key.length(); i++) {
+      char c = key.charAt(i);
+      switch (c) {
+      case ':':
+      case '*':
+        sb.append("\\" + c);
+        break;
+      default:
+        sb.append(c);
       }
     }
     return sb.toString();
   }
 
   @Override
-  public T get( K key, String[] fields ) {
+  public T get(K key, String[] fields) {
     ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set( CommonParams.QT, "/get" );
-    params.set( CommonParams.FL, toDelimitedString( fields, "," ) );
-    params.set( "id",  key.toString() );
+    params.set(CommonParams.QT, "/get");
+    params.set(CommonParams.FL, toDelimitedString(fields, ","));
+    params.set("id", key.toString());
     try {
-      QueryResponse rsp = server.query( params );
-      Object o = rsp.getResponse().get( "doc" );
-      if ( o == null ) {
+      QueryResponse rsp = server.query(params);
+      Object o = rsp.getResponse().get("doc");
+      if (o == null) {
         return null;
       }
-      return newInstance( (SolrDocument)o, fields );
-    } catch ( Exception e ) {
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      return newInstance((SolrDocument) o, fields);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e.getStackTrace().toString());
     }
     return null;
   }
 
-  public T newInstance( SolrDocument doc, String[] fields )
-      throws IOException {
+  public T newInstance(SolrDocument doc, String[] fields) throws IOException {
     T persistent = newPersistent();
-    if ( fields == null ) {
-      fields = fieldMap.keySet().toArray( new String[fieldMap.size()] );
+    if (fields == null) {
+      fields = fieldMap.keySet().toArray(new String[fieldMap.size()]);
     }
     String pk = mapping.getPrimaryKey();
-    for ( String f : fields ) {
-      Field field = fieldMap.get( f );
+    for (String f : fields) {
+      Field field = fieldMap.get(f);
       Schema fieldSchema = field.schema();
       String sf = null;
-      if ( pk.equals( f ) ) {
+      if (pk.equals(f)) {
         sf = f;
       } else {
-        sf = mapping.getSolrField( f );                
+        sf = mapping.getSolrField(f);
       }
-      Object sv = doc.get( sf );
-      Object v;
-      if ( sv == null ) {
+      Object sv = doc.get(sf);
+      if (sv == null) {
         continue;
       }
-      switch ( fieldSchema.getType() ) {
-        case MAP:
-        case ARRAY:
-        case RECORD:
-          v = IOUtils.deserialize( (byte[]) sv, datumReader, fieldSchema, persistent.get( field.pos() ) );
-          persistent.put( field.pos(), v );
-          break;
-        case ENUM:
-          v = AvroUtils.getEnumValue( fieldSchema, (String) sv );
-          persistent.put( field.pos(), v );
-          break;
-        case FIXED:
-          throw new IOException( "???" );
-          // break;
-        case BYTES:
-          persistent.put( field.pos(), ByteBuffer.wrap( (byte[]) sv ) );
-          break;
-        case BOOLEAN:
-        case DOUBLE:
-        case FLOAT:
-        case INT:
-        case LONG:
-          persistent.put( field.pos(), sv );
-          break;
-        case STRING:
-          persistent.put( field.pos(), new Utf8( sv.toString() ) );
-          break;
-        case UNION:
-          LOG.error( "Union is not supported yet" );
-          break;
-        default:
-          LOG.error( "Unknown field type: " + fieldSchema.getType() );
-      }
-      persistent.setDirty( field.pos() );
+
+      Object v = deserializeFieldValue(field, fieldSchema, sv, persistent);
+      persistent.put(field.pos(), v);
+      persistent.setDirty(field.pos());
+
     }
     persistent.clearDirty();
     return persistent;
   }
 
+  @SuppressWarnings("rawtypes")
+  private SpecificDatumReader getDatumReader(String schemaId, Schema fieldSchema) {
+    SpecificDatumReader<?> reader = (SpecificDatumReader<?>) readerMap
+        .get(schemaId);
+    if (reader == null) {
+      reader = new SpecificDatumReader(fieldSchema);// ignore dirty bits
+      SpecificDatumReader localReader = null;
+      if ((localReader = readerMap.putIfAbsent(schemaId, reader)) != null) {
+        reader = localReader;
+      }
+    }
+    return reader;
+  }
+
+  @SuppressWarnings("rawtypes")
+  private SpecificDatumWriter getDatumWriter(String schemaId, Schema fieldSchema) {
+    SpecificDatumWriter writer = (SpecificDatumWriter<?>) writerMap
+        .get(schemaId);
+    if (writer == null) {
+      writer = new SpecificDatumWriter(fieldSchema);// ignore dirty bits
+      writerMap.put(schemaId, writer);
+    }
+
+    return writer;
+  }
+
+  @SuppressWarnings("unchecked")
+  private Object deserializeFieldValue(Field field, Schema fieldSchema,
+      Object solrValue, T persistent) throws IOException {
+    Object fieldValue = null;
+    switch (fieldSchema.getType()) {
+    case MAP:
+    case ARRAY:
+    case RECORD:
+      @SuppressWarnings("rawtypes")
+      SpecificDatumReader reader = getDatumReader(fieldSchema.getFullName(),
+          fieldSchema);
+      fieldValue = IOUtils.deserialize((byte[]) solrValue, reader, fieldSchema,
+          persistent.get(field.pos()));
+      break;
+    case ENUM:
+      fieldValue = AvroUtils.getEnumValue(fieldSchema, (String) solrValue);
+      break;
+    case FIXED:
+      throw new IOException("???");
+      // break;
+    case BYTES:
+      fieldValue = ByteBuffer.wrap((byte[]) solrValue);
+      break;
+    case STRING:
+      fieldValue = new Utf8(solrValue.toString());
+      break;
+    case UNION:
+      if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema)) {
+        // schema [type0, type1]
+        Type type0 = fieldSchema.getTypes().get(0).getType();
+        Type type1 = fieldSchema.getTypes().get(1).getType();
+
+        // Check if types are different and there's a "null", like
+        // ["null","type"] or ["type","null"]
+        if (!type0.equals(type1)) {
+          if (type0.equals(Schema.Type.NULL))
+            fieldSchema = fieldSchema.getTypes().get(1);
+          else
+            fieldSchema = fieldSchema.getTypes().get(0);
+        } else {
+          fieldSchema = fieldSchema.getTypes().get(0);
+        }
+        fieldValue = deserializeFieldValue(field, fieldSchema, solrValue,
+            persistent);
+      } else {
+        @SuppressWarnings("rawtypes")
+        SpecificDatumReader unionReader = getDatumReader(
+            String.valueOf(fieldSchema.hashCode()), fieldSchema);
+        fieldValue = IOUtils.deserialize((byte[]) solrValue, unionReader,
+            fieldSchema, persistent.get(field.pos()));
+        break;
+      }
+      break;
+    default:
+      fieldValue = solrValue;
+    }
+    return fieldValue;
+  }
+
   @Override
-  public void put( K key, T persistent ) {
+  public void put(K key, T persistent) {
     Schema schema = persistent.getSchema();
-    StateManager stateManager = persistent.getStateManager();
-    if ( !stateManager.isDirty( persistent ) ) {
+    if (!persistent.isDirty()) {
       // nothing to do
       return;
     }
     SolrInputDocument doc = new SolrInputDocument();
     // add primary key
-    doc.addField( mapping.getPrimaryKey(), key );
+    doc.addField(mapping.getPrimaryKey(), key);
     // populate the doc
     List<Field> fields = schema.getFields();
-    for ( Field field : fields ) {
-      String sf = mapping.getSolrField( field.name() );
+    for (Field field : fields) {
+      String sf = mapping.getSolrField(field.name());
       // Solr will append values to fields in a SolrInputDocument, even the key
       // mapping won't find the primary
-      if ( sf == null ) {
+      if (sf == null) {
         continue;
       }
       Schema fieldSchema = field.schema();
-      Object v = persistent.get( field.pos() );
-      if ( v == null ) {
+      Object v = persistent.get(field.pos());
+      if (v == null) {
         continue;
       }
-      switch ( fieldSchema.getType() ) {
-        case MAP:
-        case ARRAY:
-        case RECORD:
-          byte[] data = null;
-          try {
-            data = IOUtils.serialize( datumWriter, fieldSchema, v );
-          } catch ( IOException e ) {
-            LOG.error( e.getMessage(), e.getStackTrace().toString() );
-          }
-          doc.addField( sf, data );
-          break;
-        case BYTES:
-          doc.addField( sf, ( (ByteBuffer) v ).array() );
-          break;
-        case ENUM:
-        case STRING:
-          doc.addField( sf, v.toString() );
-          break;
-        case BOOLEAN:
-        case DOUBLE:
-        case FLOAT:
-        case INT:
-        case LONG:
-          doc.addField( sf, v );
-          break;
-        case UNION:
-          LOG.error( "Union is not supported yet" );
-          break;
-        default:
-          LOG.error( "Unknown field type: " + fieldSchema.getType() );
-      }
+      v = serializeFieldValue(fieldSchema, v);
+      doc.addField(sf, v);
+
     }
-    LOG.info( "DOCUMENT: " + doc );
-    batch.add( doc );
-    if ( batch.size() >= batchSize ) {
+    LOG.info("DOCUMENT: " + doc);
+    batch.add(doc);
+    if (batch.size() >= batchSize) {
       try {
-        add( batch, commitWithin );
+        add(batch, commitWithin);
         batch.clear();
-      } catch ( Exception e ) {
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e.getStackTrace().toString());
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private Object serializeFieldValue(Schema fieldSchema, Object fieldValue) {
+    switch (fieldSchema.getType()) {
+    case MAP:
+    case ARRAY:
+    case RECORD:
+      byte[] data = null;
+      try {
+        @SuppressWarnings("rawtypes")
+        SpecificDatumWriter writer = getDatumWriter(fieldSchema.getFullName(),
+            fieldSchema);
+        data = IOUtils.serialize(writer, fieldSchema, fieldValue);
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e.getStackTrace().toString());
+      }
+      fieldValue = data;
+      break;
+    case BYTES:
+      fieldValue = ((ByteBuffer) fieldValue).array();
+      break;
+    case ENUM:
+    case STRING:
+      fieldValue = fieldValue.toString();
+      break;
+    case UNION:
+      // If field's schema is null and one type, we do undertake serialization.
+      // All other types are serialized.
+      if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema)) {
+        int schemaPos = getUnionSchema(fieldValue, fieldSchema);
+        Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
+        fieldValue = serializeFieldValue(unionSchema, fieldValue);
+      } else {
+        byte[] serilazeData = null;
+        try {
+          @SuppressWarnings("rawtypes")
+          SpecificDatumWriter writer = getDatumWriter(
+              String.valueOf(fieldSchema.hashCode()), fieldSchema);
+          serilazeData = IOUtils.serialize(writer, fieldSchema, fieldValue);
+        } catch (IOException e) {
+          LOG.error(e.getMessage(), e.getStackTrace().toString());
+        }
+        fieldValue = serilazeData;
+      }
+      break;
+    default:
+      // LOG.error("Unknown field type: " + fieldSchema.getType());
+      break;
+    }
+    return fieldValue;
+  }
+
+  private boolean isNullable(Schema unionSchema) {
+    for (Schema innerSchema : unionSchema.getTypes()) {
+      if (innerSchema.getType().equals(Schema.Type.NULL)) {
+        return true;
       }
     }
+    return false;
+  }
+
+  /**
+   * Given an object and the object schema this function obtains, from within
+   * the UNION schema, the position of the type used. If no data type can be
+   * inferred then we return a default value of position 0.
+   * 
+   * @param pValue
+   * @param pUnionSchema
+   * @return the unionSchemaPosition.
+   */
+  private int getUnionSchema(Object pValue, Schema pUnionSchema) {
+    int unionSchemaPos = 0;
+    Iterator<Schema> it = pUnionSchema.getTypes().iterator();
+    while (it.hasNext()) {
+      Type schemaType = it.next().getType();
+      if (pValue instanceof Utf8 && schemaType.equals(Type.STRING))
+        return unionSchemaPos;
+      else if (pValue instanceof ByteBuffer && schemaType.equals(Type.BYTES))
+        return unionSchemaPos;
+      else if (pValue instanceof Integer && schemaType.equals(Type.INT))
+        return unionSchemaPos;
+      else if (pValue instanceof Long && schemaType.equals(Type.LONG))
+        return unionSchemaPos;
+      else if (pValue instanceof Double && schemaType.equals(Type.DOUBLE))
+        return unionSchemaPos;
+      else if (pValue instanceof Float && schemaType.equals(Type.FLOAT))
+        return unionSchemaPos;
+      else if (pValue instanceof Boolean && schemaType.equals(Type.BOOLEAN))
+        return unionSchemaPos;
+      else if (pValue instanceof Map && schemaType.equals(Type.MAP))
+        return unionSchemaPos;
+      else if (pValue instanceof List && schemaType.equals(Type.ARRAY))
+        return unionSchemaPos;
+      else if (pValue instanceof Persistent && schemaType.equals(Type.RECORD))
+        return unionSchemaPos;
+      unionSchemaPos++;
+    }
+    // if we weren't able to determine which data type it is, then we return the
+    // default
+    return DEFAULT_UNION_SCHEMA;
   }
 
   @Override
-  public boolean delete( K key ) {
+  public boolean delete(K key) {
     String keyField = mapping.getPrimaryKey();
     try {
-      UpdateResponse rsp = server.deleteByQuery( keyField + ":" + escapeQueryKey( key.toString() ) );
+      UpdateResponse rsp = server.deleteByQuery(keyField + ":"
+          + escapeQueryKey(key.toString()));
       server.commit();
-      LOG.info( rsp.toString() );
+      LOG.info(rsp.toString());
       return true;
-    } catch ( Exception e ) {
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e.getStackTrace().toString());
     }
     return false;
   }
 
   @Override
-  public long deleteByQuery( Query<K, T> query ) {
-    String q = ( (SolrQuery<K, T>) query ).toSolrQuery();
+  public long deleteByQuery(Query<K, T> query) {
+    String q = ((SolrQuery<K, T>) query).toSolrQuery();
     try {
-      UpdateResponse rsp = server.deleteByQuery( q );
+      UpdateResponse rsp = server.deleteByQuery(q);
       server.commit();
-      LOG.info( rsp.toString() );
-    } catch ( Exception e ) {
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      LOG.info(rsp.toString());
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e.getStackTrace().toString());
     }
     return 0;
   }
 
   @Override
-  public Result<K, T> execute( Query<K, T> query ) {
+  public Result<K, T> execute(Query<K, T> query) {
     try {
-      return new SolrResult<K, T>( this, query, server, resultsSize );
-    } catch ( IOException e ) {
-      LOG.error( e.getMessage(), e.getStackTrace().toString() );
+      return new SolrResult<K, T>(this, query, server, resultsSize);
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e.getStackTrace().toString());
     }
     return null;
   }
 
   @Override
   public Query<K, T> newQuery() {
-    return new SolrQuery<K, T>( this );
+    return new SolrQuery<K, T>(this);
   }
 
   @Override
-  public List<PartitionQuery<K, T>> getPartitions( Query<K, T> query )
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
       throws IOException {
     // TODO: implement this using Hadoop DB support
 
@@ -482,11 +645,11 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T>
   @Override
   public void flush() {
     try {
-      if ( batch.size() > 0 ) {
-        add( batch, commitWithin );
+      if (batch.size() > 0) {
+        add(batch, commitWithin);
         batch.clear();
       }
-    } catch ( Exception e ) {
+    } catch (Exception e) {
       LOG.error(e.getMessage(), e.getStackTrace());
     }
   }
@@ -495,15 +658,16 @@ public class SolrStore<K, T extends PersistentBase> extends DataStoreBase<K, T>
   public void close() {
     // In testing, the index gets closed before the commit in flush() can happen
     // so an exception gets thrown
-    //flush();
+    // flush();
   }
-    
-  private void add(ArrayList<SolrInputDocument> batch, int commitWithin) throws SolrServerException, IOException {
+
+  private void add(ArrayList<SolrInputDocument> batch, int commitWithin)
+      throws SolrServerException, IOException {
     if (commitWithin == 0) {
-      server.add( batch );
-      server.commit( false, true, true );
+      server.add(batch);
+      server.commit(false, true, true);
     } else {
-      server.add( batch, commitWithin );            
+      server.add(batch, commitWithin);
     }
-  }  
+  }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-solr/src/test/conf/gora-solr-mapping.xml
----------------------------------------------------------------------
diff --git a/gora-solr/src/test/conf/gora-solr-mapping.xml b/gora-solr/src/test/conf/gora-solr-mapping.xml
index c90ab10..2f20b69 100644
--- a/gora-solr/src/test/conf/gora-solr-mapping.xml
+++ b/gora-solr/src/test/conf/gora-solr-mapping.xml
@@ -22,6 +22,8 @@
     <field name="name" column="name"/>
     <field name="dateOfBirth" column="dateOfBirth"/>
     <field name="salary" column="salary"/>
+    <field name="boss" column="boss"/>
+    <field name="webpage" column="webpage"/>
   </class>
 
   <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" table="WebPage">
@@ -29,6 +31,7 @@
     <field name="content" column="content"/>
     <field name="parsedContent" column="parsedContent"/>
     <field name="outlinks" column="outlinks"/>
+    <field name="headers" column="headers"/>     
     <field name="metadata" column="metadata"/>
   </class>
 </gora-orm>

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-solr/src/test/conf/gora.properties
----------------------------------------------------------------------
diff --git a/gora-solr/src/test/conf/gora.properties b/gora-solr/src/test/conf/gora.properties
index 081587e..2fcddaf 100644
--- a/gora-solr/src/test/conf/gora.properties
+++ b/gora-solr/src/test/conf/gora.properties
@@ -1 +1,2 @@
 gora.solrstore.solr.url=http://localhost:9876/solr
+gora.datastore.solr.commitWithin=0

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-solr/src/test/conf/solr/Employee/conf/schema.xml
----------------------------------------------------------------------
diff --git a/gora-solr/src/test/conf/solr/Employee/conf/schema.xml b/gora-solr/src/test/conf/solr/Employee/conf/schema.xml
index 4d83f7b..9295c61 100644
--- a/gora-solr/src/test/conf/solr/Employee/conf/schema.xml
+++ b/gora-solr/src/test/conf/solr/Employee/conf/schema.xml
@@ -28,7 +28,9 @@
     <field name="name"        type="string" indexed="true" stored="true" />
     <field name="dateOfBirth" type="long" stored="true" /> 
     <field name="salary"      type="int" stored="true" /> 
-
+    <field name="boss"        type="binary" stored="true" />
+    <field name="webpage"     type="binary" stored="true" />
+    
   </fields>
 
   <uniqueKey>ssn</uniqueKey>
@@ -37,6 +39,7 @@
     <fieldType name="string" class="solr.StrField" sortMissingLast="true" />
     <fieldType name="int" class="solr.TrieIntField" precisionStep="0" positionIncrementGap="0"/>
     <fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
+    <fieldtype name="binary" class="solr.BinaryField"/>
   </types>  
 
 </schema>

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml
----------------------------------------------------------------------
diff --git a/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml b/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml
index 2fef161..48bde7c 100644
--- a/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml
+++ b/gora-solr/src/test/conf/solr/WebPage/conf/schema.xml
@@ -28,7 +28,8 @@
     <field name="parsedContent" type="binary" stored="true" /> 
     <field name="content"       type="binary" stored="true" /> 
     <field name="outlinks"      type="binary" stored="true" /> 
-    <field name="metadata"      type="binary" stored="true" />    
+    <field name="headers"       type="binary" stored="true" />     
+    <field name="metadata"      type="binary" stored="true" />
 
   </fields>
 

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java
----------------------------------------------------------------------
diff --git a/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java b/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java
index 2bfd469..95a7078 100644
--- a/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java
+++ b/gora-solr/src/test/java/org/apache/gora/solr/store/TestSolrStore.java
@@ -25,6 +25,7 @@ import org.apache.gora.solr.GoraSolrTestDriver;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.store.DataStoreTestBase;
+import org.junit.Ignore;
 
 public class TestSolrStore extends DataStoreTestBase {
   
@@ -49,46 +50,7 @@ public class TestSolrStore extends DataStoreTestBase {
   }
 
 
-  public void testGetRecursive() {
-  }
-
-  public void testGetDoubleRecursive() {
-  }
-
-  public void testGetNested() {
-  }
-
-  public void testGet3UnionField() {
-  }
-
-  public void testQuery() {
-  }
-
-  public void testQueryStartKey() {
-  }
-
-  public void testQueryEndKey() {
-  }
-
-  public void testQueryKeyRange() {
-  }
-
-  public void testQueryWebPageSingleKey() {
-  }
-
-  public void testQueryWebPageSingleKeyDefaultFields() {
-  }
-
-  public void testDeleteByQuery() {
-  }
-
-  public void testGetPartitions() {
-  }
-
-  public void testUpdate() {
-  }
-
-  public void testDeleteByQueryFields() {
-  }
-
+  @Ignore("GORA-310 and GORA-311 issues are not fixed at SolrStore")
+  @Override
+  public void testDeleteByQueryFields() throws IOException {}
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-tutorial/pom.xml
----------------------------------------------------------------------
diff --git a/gora-tutorial/pom.xml b/gora-tutorial/pom.xml
index 1ca2487..5e66e7e 100644
--- a/gora-tutorial/pom.xml
+++ b/gora-tutorial/pom.xml
@@ -128,7 +128,7 @@
         </dependency>
         
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
+            <groupId>org.apache.avro</groupId>
             <artifactId>avro</artifactId>
         </dependency>
         

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-tutorial/src/main/avro/metricdatum.json
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/avro/metricdatum.json b/gora-tutorial/src/main/avro/metricdatum.json
index 9fa61c0..f65eab5 100644
--- a/gora-tutorial/src/main/avro/metricdatum.json
+++ b/gora-tutorial/src/main/avro/metricdatum.json
@@ -1,10 +1,10 @@
 {
   "type": "record",
-  "name": "MetricDatum",
+  "name": "MetricDatum", "default":null,
   "namespace": "org.apache.gora.tutorial.log.generated",
   "fields" : [
-    {"name": "metricDimension", "type": "string"},
-    {"name": "timestamp", "type": "long"},
-    {"name": "metric", "type" : "long"}
+    {"name": "metricDimension", "type": ["null","string"], "default":null},
+    {"name": "timestamp", "type": "long","default":0},
+    {"name": "metric", "type" : "long","default":0}
   ]
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-tutorial/src/main/avro/pageview.json
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/avro/pageview.json b/gora-tutorial/src/main/avro/pageview.json
index 34c08b3..b7da828 100644
--- a/gora-tutorial/src/main/avro/pageview.json
+++ b/gora-tutorial/src/main/avro/pageview.json
@@ -1,15 +1,15 @@
 {
   "type": "record",
-  "name": "Pageview",
+  "name": "Pageview", "default":null,
   "namespace": "org.apache.gora.tutorial.log.generated",
   "fields" : [
-    {"name": "url", "type": "string"},
-    {"name": "timestamp", "type": "long"},
-    {"name": "ip", "type": "string"},
-    {"name": "httpMethod", "type": "string"},
-    {"name": "httpStatusCode", "type": "int"},
-    {"name": "responseSize", "type": "int"},
-    {"name": "referrer", "type": "string"},
-    {"name": "userAgent", "type": "string"}
+    {"name": "url", "type": ["null","string"], "default":null},
+    {"name": "timestamp", "type": "long", "default":0},
+    {"name": "ip", "type": ["null","string"], "default":null},
+    {"name": "httpMethod", "type": ["null","string"], "default":null},
+    {"name": "httpStatusCode", "type": "int", "default":0},
+    {"name": "responseSize", "type": "int", "default":0},
+    {"name": "referrer", "type": ["null","string"], "default":null},
+    {"name": "userAgent", "type": ["null","string"], "default":null}
   ]
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalytics.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalytics.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalytics.java
index 9106932..6691fd6 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalytics.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalytics.java
@@ -77,7 +77,7 @@ public class LogAnalytics extends Configured implements Tool {
     protected void map(Long key, Pageview pageview, Context context)
         throws IOException ,InterruptedException {
       
-      Utf8 url = pageview.getUrl();
+      CharSequence url = pageview.getUrl();
       long day = getDay(pageview.getTimestamp());
       
       tuple.getKey().set(url.toString());

http://git-wip-us.apache.org/repos/asf/gora/blob/136fc595/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/generated/MetricDatum.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/generated/MetricDatum.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/generated/MetricDatum.java
index 6150c15..600919c 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/generated/MetricDatum.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/generated/MetricDatum.java
@@ -1,111 +1,365 @@
 /**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the"
- *License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
-  * http://www.apache.org/licenses/LICENSE-2.0
+ * Autogenerated by Avro
  * 
- *Unless required by applicable law or agreed to in writing, software
- *distributed under the License is distributed on an "AS IS" BASIS,
- *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *See the License for the specific language governing permissions and
- *limitations under the License.
+ * DO NOT EDIT DIRECTLY
  */
-
-package org.apache.gora.tutorial.log.generated;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.HashMap;
-import org.apache.avro.Protocol;
-import org.apache.avro.Schema;
-import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.Protocol;
-import org.apache.avro.util.Utf8;
-import org.apache.avro.ipc.AvroRemoteException;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.specific.FixedSize;
-import org.apache.avro.specific.SpecificExceptionBase;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificFixed;
-import org.apache.gora.persistency.StateManager;
-import org.apache.gora.persistency.impl.PersistentBase;
-import org.apache.gora.persistency.impl.StateManagerImpl;
-import org.apache.gora.persistency.StatefulHashMap;
-import org.apache.gora.persistency.ListGenericArray;
-
+package org.apache.gora.tutorial.log.generated;  
 @SuppressWarnings("all")
-public class MetricDatum extends PersistentBase {
-  public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"MetricDatum\",\"namespace\":\"org.apache.gora.tutorial.log.generated\",\"fields\":[{\"name\":\"metricDimension\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"metric\",\"type\":\"long\"}]}");
-  public static enum Field {
-    METRIC_DIMENSION(0,"metricDimension"),
-    TIMESTAMP(1,"timestamp"),
-    METRIC(2,"metric"),
-    ;
-    private int index;
-    private String name;
-    Field(int index, String name) {this.index=index;this.name=name;}
-    public int getIndex() {return index;}
-    public String getName() {return name;}
-    public String toString() {return name;}
-  };
-  public static final String[] _ALL_FIELDS = {"metricDimension","timestamp","metric",};
-  static {
-    PersistentBase.registerFields(MetricDatum.class, _ALL_FIELDS);
-  }
-  private Utf8 metricDimension;
+public class MetricDatum extends org.apache.gora.persistency.impl.PersistentBase implements org.apache.avro.specific.SpecificRecord, org.apache.gora.persistency.Persistent {
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MetricDatum\",\"namespace\":\"org.apache.gora.tutorial.log.generated\",\"fields\":[{\"name\":\"__g__dirty\",\"type\":\"bytes\",\"doc\":\"Bytes used to represent weather or not a field is dirty.\",\"default\":\"AA==\"},{\"name\":\"metricDimension\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp\",\"type\":\"long\",\"default\":0},{\"name\":\"metric\",\"type\":\"long\",\"default\":0}]}");
+  /** Bytes used to represent weather or not a field is dirty. */
+  private java.nio.ByteBuffer __g__dirty = java.nio.ByteBuffer.wrap(new byte[1]);
+  private java.lang.CharSequence metricDimension;
   private long timestamp;
   private long metric;
-  public MetricDatum() {
-    this(new StateManagerImpl());
-  }
-  public MetricDatum(StateManager stateManager) {
-    super(stateManager);
-  }
-  public MetricDatum newInstance(StateManager stateManager) {
-    return new MetricDatum(stateManager);
-  }
-  public Schema getSchema() { return _SCHEMA; }
-  public Object get(int _field) {
-    switch (_field) {
-    case 0: return metricDimension;
-    case 1: return timestamp;
-    case 2: return metric;
-    default: throw new AvroRuntimeException("Bad index");
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call. 
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return __g__dirty;
+    case 1: return metricDimension;
+    case 2: return timestamp;
+    case 3: return metric;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
     }
   }
+  
+  // Used by DatumReader.  Applications should not call. 
   @SuppressWarnings(value="unchecked")
-  public void put(int _field, Object _value) {
-    if(isFieldEqual(_field, _value)) return;
-    getStateManager().setDirty(this, _field);
-    switch (_field) {
-    case 0:metricDimension = (Utf8)_value; break;
-    case 1:timestamp = (Long)_value; break;
-    case 2:metric = (Long)_value; break;
-    default: throw new AvroRuntimeException("Bad index");
+  public void put(int field$, java.lang.Object value) {
+    switch (field$) {
+    case 0: __g__dirty = (java.nio.ByteBuffer)(value); break;
+    case 1: metricDimension = (java.lang.CharSequence)(value); break;
+    case 2: timestamp = (java.lang.Long)(value); break;
+    case 3: metric = (java.lang.Long)(value); break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
     }
   }
-  public Utf8 getMetricDimension() {
-    return (Utf8) get(0);
+
+  /**
+   * Gets the value of the 'metricDimension' field.
+   */
+  public java.lang.CharSequence getMetricDimension() {
+    return metricDimension;
+  }
+
+  /**
+   * Sets the value of the 'metricDimension' field.
+   * @param value the value to set.
+   */
+  public void setMetricDimension(java.lang.CharSequence value) {
+    this.metricDimension = value;
+    setDirty(1);
   }
-  public void setMetricDimension(Utf8 value) {
-    put(0, value);
+  
+  /**
+   * Checks the dirty status of the 'metricDimension' field. A field is dirty if it represents a change that has not yet been written to the database.
+   * @param value the value to set.
+   */
+  public boolean isMetricDimensionDirty(java.lang.CharSequence value) {
+    return isDirty(1);
   }
-  public long getTimestamp() {
-    return (Long) get(1);
+
+  /**
+   * Gets the value of the 'timestamp' field.
+   */
+  public java.lang.Long getTimestamp() {
+    return timestamp;
   }
-  public void setTimestamp(long value) {
-    put(1, value);
+
+  /**
+   * Sets the value of the 'timestamp' field.
+   * @param value the value to set.
+   */
+  public void setTimestamp(java.lang.Long value) {
+    this.timestamp = value;
+    setDirty(2);
   }
-  public long getMetric() {
-    return (Long) get(2);
+  
+  /**
+   * Checks the dirty status of the 'timestamp' field. A field is dirty if it represents a change that has not yet been written to the database.
+   * @param value the value to set.
+   */
+  public boolean isTimestampDirty(java.lang.Long value) {
+    return isDirty(2);
   }
-  public void setMetric(long value) {
-    put(2, value);
+
+  /**
+   * Gets the value of the 'metric' field.
+   */
+  public java.lang.Long getMetric() {
+    return metric;
+  }
+
+  /**
+   * Sets the value of the 'metric' field.
+   * @param value the value to set.
+   */
+  public void setMetric(java.lang.Long value) {
+    this.metric = value;
+    setDirty(3);
+  }
+  
+  /**
+   * Checks the dirty status of the 'metric' field. A field is dirty if it represents a change that has not yet been written to the database.
+   * @param value the value to set.
+   */
+  public boolean isMetricDirty(java.lang.Long value) {
+    return isDirty(3);
+  }
+
+  /** Creates a new MetricDatum RecordBuilder */
+  public static org.apache.gora.tutorial.log.generated.MetricDatum.Builder newBuilder() {
+    return new org.apache.gora.tutorial.log.generated.MetricDatum.Builder();
+  }
+  
+  /** Creates a new MetricDatum RecordBuilder by copying an existing Builder */
+  public static org.apache.gora.tutorial.log.generated.MetricDatum.Builder newBuilder(org.apache.gora.tutorial.log.generated.MetricDatum.Builder other) {
+    return new org.apache.gora.tutorial.log.generated.MetricDatum.Builder(other);
+  }
+  
+  /** Creates a new MetricDatum RecordBuilder by copying an existing MetricDatum instance */
+  public static org.apache.gora.tutorial.log.generated.MetricDatum.Builder newBuilder(org.apache.gora.tutorial.log.generated.MetricDatum other) {
+    return new org.apache.gora.tutorial.log.generated.MetricDatum.Builder(other);
+  }
+  
+  private static java.nio.ByteBuffer deepCopyToWriteOnlyBuffer(
+      java.nio.ByteBuffer input) {
+    java.nio.ByteBuffer copy = java.nio.ByteBuffer.allocate(input.capacity());
+    int position = input.position();
+    input.reset();
+    int mark = input.position();
+    int limit = input.limit();
+    input.rewind();
+    input.limit(input.capacity());
+    copy.put(input);
+    input.rewind();
+    copy.rewind();
+    input.position(mark);
+    input.mark();
+    copy.position(mark);
+    copy.mark();
+    input.position(position);
+    copy.position(position);
+    input.limit(limit);
+    copy.limit(limit);
+    return copy.asReadOnlyBuffer();
+  }
+  
+  /**
+   * RecordBuilder for MetricDatum instances.
+   */
+  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<MetricDatum>
+    implements org.apache.avro.data.RecordBuilder<MetricDatum> {
+
+    private java.nio.ByteBuffer __g__dirty;
+    private java.lang.CharSequence metricDimension;
+    private long timestamp;
+    private long metric;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(org.apache.gora.tutorial.log.generated.MetricDatum.SCHEMA$);
+    }
+    
+    /** Creates a Builder by copying an existing Builder */
+    private Builder(org.apache.gora.tutorial.log.generated.MetricDatum.Builder other) {
+      super(other);
+    }
+    
+    /** Creates a Builder by copying an existing MetricDatum instance */
+    private Builder(org.apache.gora.tutorial.log.generated.MetricDatum other) {
+            super(org.apache.gora.tutorial.log.generated.MetricDatum.SCHEMA$);
+      if (isValidValue(fields()[0], other.__g__dirty)) {
+        this.__g__dirty = (java.nio.ByteBuffer) data().deepCopy(fields()[0].schema(), other.__g__dirty);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.metricDimension)) {
+        this.metricDimension = (java.lang.CharSequence) data().deepCopy(fields()[1].schema(), other.metricDimension);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.timestamp)) {
+        this.timestamp = (java.lang.Long) data().deepCopy(fields()[2].schema(), other.timestamp);
+        fieldSetFlags()[2] = true;
+      }
+      if (isValidValue(fields()[3], other.metric)) {
+        this.metric = (java.lang.Long) data().deepCopy(fields()[3].schema(), other.metric);
+        fieldSetFlags()[3] = true;
+      }
+    }
+
+    /** Gets the value of the 'metricDimension' field */
+    public java.lang.CharSequence getMetricDimension() {
+      return metricDimension;
+    }
+    
+    /** Sets the value of the 'metricDimension' field */
+    public org.apache.gora.tutorial.log.generated.MetricDatum.Builder setMetricDimension(java.lang.CharSequence value) {
+      validate(fields()[1], value);
+      this.metricDimension = value;
+      fieldSetFlags()[1] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'metricDimension' field has been set */
+    public boolean hasMetricDimension() {
+      return fieldSetFlags()[1];
+    }
+    
+    /** Clears the value of the 'metricDimension' field */
+    public org.apache.gora.tutorial.log.generated.MetricDatum.Builder clearMetricDimension() {
+      metricDimension = null;
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+    
+    /** Gets the value of the 'timestamp' field */
+    public java.lang.Long getTimestamp() {
+      return timestamp;
+    }
+    
+    /** Sets the value of the 'timestamp' field */
+    public org.apache.gora.tutorial.log.generated.MetricDatum.Builder setTimestamp(long value) {
+      validate(fields()[2], value);
+      this.timestamp = value;
+      fieldSetFlags()[2] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'timestamp' field has been set */
+    public boolean hasTimestamp() {
+      return fieldSetFlags()[2];
+    }
+    
+    /** Clears the value of the 'timestamp' field */
+    public org.apache.gora.tutorial.log.generated.MetricDatum.Builder clearTimestamp() {
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+    
+    /** Gets the value of the 'metric' field */
+    public java.lang.Long getMetric() {
+      return metric;
+    }
+    
+    /** Sets the value of the 'metric' field */
+    public org.apache.gora.tutorial.log.generated.MetricDatum.Builder setMetric(long value) {
+      validate(fields()[3], value);
+      this.metric = value;
+      fieldSetFlags()[3] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'metric' field has been set */
+    public boolean hasMetric() {
+      return fieldSetFlags()[3];
+    }
+    
+    /** Clears the value of the 'metric' field */
+    public org.apache.gora.tutorial.log.generated.MetricDatum.Builder clearMetric() {
+      fieldSetFlags()[3] = false;
+      return this;
+    }
+    
+    @Override
+    public MetricDatum build() {
+      try {
+        MetricDatum record = new MetricDatum();
+        record.__g__dirty = fieldSetFlags()[0] ? this.__g__dirty : (java.nio.ByteBuffer) java.nio.ByteBuffer.wrap(new byte[1]);
+        record.metricDimension = fieldSetFlags()[1] ? this.metricDimension : (java.lang.CharSequence) defaultValue(fields()[1]);
+        record.timestamp = fieldSetFlags()[2] ? this.timestamp : (java.lang.Long) defaultValue(fields()[2]);
+        record.metric = fieldSetFlags()[3] ? this.metric : (java.lang.Long) defaultValue(fields()[3]);
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+  
+  public MetricDatum.Tombstone getTombstone(){
+  	return TOMBSTONE;
+  }
+
+  public MetricDatum newInstance(){
+    return newBuilder().build();
+  }
+
+  private static final Tombstone TOMBSTONE = new Tombstone();
+  
+  public static final class Tombstone extends MetricDatum implements org.apache.gora.persistency.Tombstone {
+  
+      private Tombstone() { }
+  
+	  				  /**
+	   * Gets the value of the 'metricDimension' field.
+		   */
+	  public java.lang.CharSequence getMetricDimension() {
+	    throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+	  }
+	
+	  /**
+	   * Sets the value of the 'metricDimension' field.
+		   * @param value the value to set.
+	   */
+	  public void setMetricDimension(java.lang.CharSequence value) {
+	    throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+	  }
+	  
+	  /**
+	   * Checks the dirty status of the 'metricDimension' field. A field is dirty if it represents a change that has not yet been written to the database.
+		   * @param value the value to set.
+	   */
+	  public boolean isMetricDimensionDirty(java.lang.CharSequence value) {
+	    throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+	  }
+	
+				  /**
+	   * Gets the value of the 'timestamp' field.
+		   */
+	  public java.lang.Long getTimestamp() {
+	    throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+	  }
+	
+	  /**
+	   * Sets the value of the 'timestamp' field.
+		   * @param value the value to set.
+	   */
+	  public void setTimestamp(java.lang.Long value) {
+	    throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+	  }
+	  
+	  /**
+	   * Checks the dirty status of the 'timestamp' field. A field is dirty if it represents a change that has not yet been written to the database.
+		   * @param value the value to set.
+	   */
+	  public boolean isTimestampDirty(java.lang.Long value) {
+	    throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+	  }
+	
+				  /**
+	   * Gets the value of the 'metric' field.
+		   */
+	  public java.lang.Long getMetric() {
+	    throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+	  }
+	
+	  /**
+	   * Sets the value of the 'metric' field.
+		   * @param value the value to set.
+	   */
+	  public void setMetric(java.lang.Long value) {
+	    throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+	  }
+	  
+	  /**
+	   * Checks the dirty status of the 'metric' field. A field is dirty if it represents a change that has not yet been written to the database.
+		   * @param value the value to set.
+	   */
+	  public boolean isMetricDirty(java.lang.Long value) {
+	    throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+	  }
+	
+		  
   }
-}
+  
+}
\ No newline at end of file


Mime
View raw message