gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lewi...@apache.org
Subject svn commit: r1405417 [1/4] - in /gora/trunk: ./ bin/ gora-accumulo/src/main/java/org/apache/gora/accumulo/query/ gora-accumulo/src/main/java/org/apache/gora/accumulo/store/ gora-cassandra/src/main/java/org/apache/gora/cassandra/query/ gora-cassandra/sr...
Date Sat, 03 Nov 2012 21:09:14 GMT
Author: lewismc
Date: Sat Nov  3 21:09:11 2012
New Revision: 1405417

URL: http://svn.apache.org/viewvc?rev=1405417&view=rev
Log:
GORA-103 Datastore for gora-dynamodb

Added:
    gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/ws/
    gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/ws/impl/
    gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/ws/impl/BeanFactoryWSImpl.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/ws/impl/PersistentWSBase.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/ws/impl/StateManagerWSImpl.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/ResultWSBase.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/store/WebServiceBackedDataStore.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/store/ws/
    gora/trunk/gora-core/src/main/java/org/apache/gora/store/ws/impl/
    gora/trunk/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSBackedDataStoreBase.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreBase.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreFactory.java
    gora/trunk/gora-core/src/test/java/org/apache/gora/store/WSDataStoreTestBase.java
Modified:
    gora/trunk/CHANGES.txt
    gora/trunk/bin/gora
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloQuery.java
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
    gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
    gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/avro/PersistentDatumReader.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/avro/PersistentDatumWriter.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/AvroQuery.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/AvroResult.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/DataFileAvroResult.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraInputFormat.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentDeserializer.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentNonReusingSerialization.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerialization.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerializer.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/BeanFactory.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/Persistent.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/StateManager.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/StateManagerImpl.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/PartitionQuery.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/Query.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/Result.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/FileSplitPartitionQuery.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/store/DataStore.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/util/IOUtils.java
    gora/trunk/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java
    gora/trunk/gora-core/src/test/java/org/apache/gora/avro/TestPersistentDatumReader.java
    gora/trunk/gora-core/src/test/java/org/apache/gora/avro/store/TestAvroStore.java
    gora/trunk/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
    gora/trunk/gora-core/src/test/java/org/apache/gora/mock/store/MockDataStore.java
    gora/trunk/gora-core/src/test/java/org/apache/gora/persistency/impl/TestPersistentBase.java
    gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestBase.java
    gora/trunk/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseGetResult.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseResult.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseScannerResult.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
    gora/trunk/gora-sql/src/main/java/org/apache/gora/sql/query/SqlQuery.java
    gora/trunk/gora-sql/src/main/java/org/apache/gora/sql/query/SqlResult.java
    gora/trunk/gora-sql/src/main/java/org/apache/gora/sql/statement/HSqlInsertUpdateStatement.java
    gora/trunk/gora-sql/src/main/java/org/apache/gora/sql/statement/InsertUpdateStatement.java
    gora/trunk/gora-sql/src/main/java/org/apache/gora/sql/statement/InsertUpdateStatementFactory.java
    gora/trunk/gora-sql/src/main/java/org/apache/gora/sql/statement/MySqlInsertUpdateStatement.java
    gora/trunk/gora-sql/src/main/java/org/apache/gora/sql/store/SqlStore.java
    gora/trunk/gora-sql/src/test/java/org/apache/gora/sql/store/TestSqlStore.java
    gora/trunk/gora-tutorial/conf/gora.properties
    gora/trunk/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogManager.java
    gora/trunk/pom.xml

Modified: gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Sat Nov  3 21:09:11 2012
@@ -6,6 +6,8 @@ Gora Change Log
 
 trunk (current development)
 
+* GORA-103 Datastore for gora-dynamodb (rmarroquin via lewismc)
+
 * GORA-160 Gora Fails to Import Into Recent Versions of Eclipse (Ed Kohlwey via lewismc)
 
 * GORA-85 Implement "Usage" messages for SpecificCompiler and LogAnalytics (lewismc)

Modified: gora/trunk/bin/gora
URL: http://svn.apache.org/viewvc/gora/trunk/bin/gora?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/bin/gora (original)
+++ gora/trunk/bin/gora Sat Nov  3 21:09:11 2012
@@ -43,8 +43,9 @@ done
 if [ $# = 0 ]; then
   echo "Usage: run COMMAND [COMMAND options]"
   echo "where COMMAND is one of:"
-  echo "  compile                    Run Compiler"
+  echo "  goracompiler               Run Compiler"
   echo "  specificcompiler           Run Avro Specific Compiler"
+  echo "  dynamocompiler             Run Gora DynamoDB Compiler"
   echo "  logmanager                 Run the tutorial log manager"
   echo "  loganalytics               Run the tutorial log analytics"
   echo "  junit         	     Run the given JUnit test"
@@ -107,7 +108,7 @@ fi
 #GORA_OPTS="$GORA_OPTS -Dhadoop.log.file=$GORA_LOGFILE"
 
 # figure out which class to run
-if [ "$COMMAND" = "compile" ] ; then
+if [ "$COMMAND" = "goracompiler" ] ; then
   MODULE=gora-core
   CLASSPATH=$CLASSPATH:$GORA_HOME/$MODULE/target/classes/
   CLASS=org.apache.gora.compiler.GoraCompiler
@@ -115,6 +116,10 @@ elif [ "$COMMAND" = "specificcompiler" ]
   MODULE=gora-core
   CLASSPATH=$CLASSPATH:$GORA_HOME/$MODULE/target/classes/
   CLASS=org.apache.avro.specific.SpecificCompiler
+elif [ "$COMMAND" = "dynamocompiler" ] ; then
+  MODULE=gora-dynamodb
+  CLASSPATH=$CLASSPATH:$GORA_HOME/$MODULE/target/classes/
+  CLASS=org.apache.gora.dynamodb.compiler.GoraDynamoDBCompiler
 elif [ "$COMMAND" = "logmanager" ] ; then
   MODULE=gora-tutorial
   CLASSPATH=$CLASSPATH:$GORA_HOME/$MODULE/target/classes/
@@ -125,7 +130,7 @@ elif [ "$COMMAND" = "loganalytics" ] ; t
   CLASSPATH=$CLASSPATH:$GORA_HOME/$MODULE/target/classes/
 elif [ "$COMMAND" = "junit" ] ; then
   MODULE=*
-  CLASSPATH=$CLASSPATH:target/test-classes/
+  CLASSPATH=$CLASSPATH:$GORA_HOME/$MODULE/target/test-classes/
   CLASS=junit.textui.TestRunner
 else
   MODULE="$COMMAND"

Modified: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloQuery.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloQuery.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloQuery.java (original)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloQuery.java Sat Nov  3 21:09:11 2012
@@ -16,14 +16,14 @@
  */
 package org.apache.gora.accumulo.query;
 
-import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.impl.QueryBase;
 import org.apache.gora.store.DataStore;
 
 /**
  * 
  */
-public class AccumuloQuery<K,T extends Persistent> extends QueryBase<K,T> {
+public class AccumuloQuery<K,T extends PersistentBase> extends QueryBase<K,T> {
   
   public AccumuloQuery() {
     super(null);

Modified: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java (original)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java Sat Nov  3 21:09:11 2012
@@ -26,7 +26,7 @@ import org.apache.accumulo.core.data.Byt
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.gora.accumulo.store.AccumuloStore;
-import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.ResultBase;
 import org.apache.gora.store.DataStore;
@@ -34,7 +34,7 @@ import org.apache.gora.store.DataStore;
 /**
  * 
  */
-public class AccumuloResult<K,T extends Persistent> extends ResultBase<K,T> {
+public class AccumuloResult<K,T extends PersistentBase> extends ResultBase<K,T> {
   
   private RowIterator iterator;
 

Modified: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java (original)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java Sat Nov  3 21:09:11 2012
@@ -81,11 +81,11 @@ import org.apache.gora.accumulo.encoders
 import org.apache.gora.accumulo.query.AccumuloQuery;
 import org.apache.gora.accumulo.query.AccumuloResult;
 import org.apache.gora.persistency.ListGenericArray;
-import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.State;
 import org.apache.gora.persistency.StateManager;
 import org.apache.gora.persistency.StatefulHashMap;
 import org.apache.gora.persistency.StatefulMap;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
@@ -97,11 +97,13 @@ import org.apache.hadoop.io.Text;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.NodeList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * 
  */
-public class AccumuloStore<K,T extends Persistent> extends DataStoreBase<K,T> {
+public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T> {
   
   protected static final String MOCK_PROPERTY = "accumulo.mock";
   protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance";
@@ -116,6 +118,8 @@ public class AccumuloStore<K,T extends P
   private AuthInfo authInfo;
   private Encoder encoder;
   
+  public static final Logger LOG = LoggerFactory.getLogger(AccumuloStore.class);
+  
   public Object fromBytes(Schema schema, byte data[]) {
     return fromBytes(encoder, schema, data);
   }
@@ -233,46 +237,51 @@ public class AccumuloStore<K,T extends P
   }
 
   @Override
-  public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws IOException {
-    super.initialize(keyClass, persistentClass, properties);
-
-    String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null);
-    String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
-    String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null);
-    String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null);
-    
-    mapping = readMapping(mappingFile);
-
-    if (mapping.encoder == null || mapping.encoder.equals("")) {
-      encoder = new org.apache.gora.accumulo.encoders.BinaryEncoder();
-    } else {
+  public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) {
+    try{
+      super.initialize(keyClass, persistentClass, properties);
+  
+      String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null);
+      String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
+      String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null);
+      String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null);
+      
+      mapping = readMapping(mappingFile);
+  
+      if (mapping.encoder == null || mapping.encoder.equals("")) {
+        encoder = new org.apache.gora.accumulo.encoders.BinaryEncoder();
+      } else {
+        try {
+          encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
+        } catch (InstantiationException e) {
+          throw new IOException(e);
+        } catch (IllegalAccessException e) {
+          throw new IOException(e);
+        } catch (ClassNotFoundException e) {
+          throw new IOException(e);
+        }
+      }
+  
       try {
-        encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
-      } catch (InstantiationException e) {
-        throw new IOException(e);
-      } catch (IllegalAccessException e) {
+        if (mock == null || !mock.equals("true")) {
+          String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null);
+          String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null);
+          conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, password);
+          authInfo = new AuthInfo(user, ByteBuffer.wrap(password.getBytes()), conn.getInstance().getInstanceID());
+        } else {
+          conn = new MockInstance().getConnector(user, password);
+        }
+  
+        if (autoCreateSchema)
+          createSchema();
+      } catch (AccumuloException e) {
         throw new IOException(e);
-      } catch (ClassNotFoundException e) {
+      } catch (AccumuloSecurityException e) {
         throw new IOException(e);
       }
-    }
-
-    try {
-      if (mock == null || !mock.equals("true")) {
-        String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null);
-        String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null);
-        conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, password);
-        authInfo = new AuthInfo(user, ByteBuffer.wrap(password.getBytes()), conn.getInstance().getInstanceID());
-      } else {
-        conn = new MockInstance().getConnector(user, password);
-      }
-
-      if (autoCreateSchema)
-        createSchema();
-    } catch (AccumuloException e) {
-      throw new IOException(e);
-    } catch (AccumuloSecurityException e) {
-      throw new IOException(e);
+    }catch(IOException e){
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
     }
   }
   
@@ -341,7 +350,7 @@ public class AccumuloStore<K,T extends P
   }
   
   @Override
-  public void createSchema() throws IOException {
+  public void createSchema() {
     try {
       conn.tableOperations().create(mapping.tableName);
       Set<Entry<String,String>> es = mapping.tableConfig.entrySet();
@@ -350,32 +359,38 @@ public class AccumuloStore<K,T extends P
       }
 
     } catch (AccumuloException e) {
-      throw new IOException(e);
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
     } catch (AccumuloSecurityException e) {
-      throw new IOException(e);
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
     } catch (TableExistsException e) {
-      return;
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
     }
   }
 
   @Override
-  public void deleteSchema() throws IOException {
+  public void deleteSchema() {
     try {
       if (batchWriter != null)
         batchWriter.close();
       batchWriter = null;
       conn.tableOperations().delete(mapping.tableName);
     } catch (AccumuloException e) {
-      throw new IOException(e);
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
     } catch (AccumuloSecurityException e) {
-      throw new IOException(e);
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
     } catch (TableNotFoundException e) {
-      return;
-    }
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
+    } 
   }
 
   @Override
-  public boolean schemaExists() throws IOException {
+  public boolean schemaExists() {
     return conn.tableOperations().exists(mapping.tableName);
   }
 
@@ -475,7 +490,7 @@ public class AccumuloStore<K,T extends P
   }
 
   @Override
-  public T get(K key, String[] fields) throws IOException {
+  public T get(K key, String[] fields) {
     try {
       // TODO make isolated scanner optional?
       Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Constants.NO_AUTHS));
@@ -490,103 +505,115 @@ public class AccumuloStore<K,T extends P
         return null;
       return persistent;
     } catch (TableNotFoundException e) {
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
+      return null;
+    } catch (IOException e) {
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
       return null;
     }
   }
   
   @Override
-  public void put(K key, T val) throws IOException {
+  public void put(K key, T val) {
 
-    Mutation m = new Mutation(new Text(toBytes(key)));
-    
-    Schema schema = val.getSchema();
-    StateManager stateManager = val.getStateManager();
-    
-    Iterator<Field> iter = schema.getFields().iterator();
-    
-    int count = 0;
-    for (int i = 0; iter.hasNext(); i++) {
-      Field field = iter.next();
-      if (!stateManager.isDirty(val, i)) {
-        continue;
-      }
+    try{
+      Mutation m = new Mutation(new Text(toBytes(key)));
       
-      Object o = val.get(i);
-      Pair<Text,Text> col = mapping.fieldMap.get(field.name());
-
-      switch (field.schema().getType()) {
-        case MAP:
-          if (o instanceof StatefulMap) {
-            StatefulMap map = (StatefulMap) o;
-            Set<?> es = map.states().entrySet();
-            for (Object entry : es) {
-              Object mapKey = ((Entry) entry).getKey();
-              State state = (State) ((Entry) entry).getValue();
-
-              switch (state) {
-                case NEW:
-                case DIRTY:
-                  m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(map.get(mapKey))));
-                  count++;
-                  break;
-                case DELETED:
-                  m.putDelete(col.getFirst(), new Text(toBytes(mapKey)));
-                  count++;
-                  break;
+      Schema schema = val.getSchema();
+      StateManager stateManager = val.getStateManager();
+      
+      Iterator<Field> iter = schema.getFields().iterator();
+      
+      int count = 0;
+      for (int i = 0; iter.hasNext(); i++) {
+        Field field = iter.next();
+        if (!stateManager.isDirty(val, i)) {
+          continue;
+        }
+        
+        Object o = val.get(i);
+        Pair<Text,Text> col = mapping.fieldMap.get(field.name());
+  
+        switch (field.schema().getType()) {
+          case MAP:
+            if (o instanceof StatefulMap) {
+              StatefulMap map = (StatefulMap) o;
+              Set<?> es = map.states().entrySet();
+              for (Object entry : es) {
+                Object mapKey = ((Entry) entry).getKey();
+                State state = (State) ((Entry) entry).getValue();
+  
+                switch (state) {
+                  case NEW:
+                  case DIRTY:
+                    m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(map.get(mapKey))));
+                    count++;
+                    break;
+                  case DELETED:
+                    m.putDelete(col.getFirst(), new Text(toBytes(mapKey)));
+                    count++;
+                    break;
+                }
+                
+              }
+            } else {
+              Map map = (Map) o;
+              Set<?> es = map.entrySet();
+              for (Object entry : es) {
+                Object mapKey = ((Entry) entry).getKey();
+                Object mapVal = ((Entry) entry).getValue();
+                m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(mapVal)));
+                count++;
               }
-              
             }
-          } else {
-            Map map = (Map) o;
-            Set<?> es = map.entrySet();
-            for (Object entry : es) {
-              Object mapKey = ((Entry) entry).getKey();
-              Object mapVal = ((Entry) entry).getValue();
-              m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(mapVal)));
+            break;
+          case ARRAY:
+            GenericArray array = (GenericArray) o;
+            int j = 0;
+            for (Object item : array) {
+              m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
               count++;
             }
-          }
-          break;
-        case ARRAY:
-          GenericArray array = (GenericArray) o;
-          int j = 0;
-          for (Object item : array) {
-            m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
+            break;
+          case RECORD:
+            SpecificDatumWriter writer = new SpecificDatumWriter(field.schema());
+            ByteArrayOutputStream os = new ByteArrayOutputStream();
+            BinaryEncoder encoder = new BinaryEncoder(os);
+            writer.write(o, encoder);
+            encoder.flush();
+            m.put(col.getFirst(), col.getSecond(), new Value(os.toByteArray()));
+            break;
+          default:
+            m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
             count++;
-          }
-          break;
-        case RECORD:
-          SpecificDatumWriter writer = new SpecificDatumWriter(field.schema());
-          ByteArrayOutputStream os = new ByteArrayOutputStream();
-          BinaryEncoder encoder = new BinaryEncoder(os);
-          writer.write(o, encoder);
-          encoder.flush();
-          m.put(col.getFirst(), col.getSecond(), new Value(os.toByteArray()));
-          break;
-        default:
-          m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
-          count++;
+        }
+  
       }
-
+      
+      if (count > 0)
+        try {
+          getBatchWriter().addMutation(m);
+        } catch (MutationsRejectedException e) {
+          LOG.error(e.getMessage());
+          LOG.error(e.getStackTrace().toString());
+        }
+    } catch (IOException e) {
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
     }
-    
-    if (count > 0)
-      try {
-        getBatchWriter().addMutation(m);
-      } catch (MutationsRejectedException e) {
-        throw new IOException(e);
-      }
   }
   
   @Override
-  public boolean delete(K key) throws IOException {
+  public boolean delete(K key) {
     Query<K,T> q = newQuery();
     q.setKey(key);
     return deleteByQuery(q) > 0;
   }
 
   @Override
-  public long deleteByQuery(Query<K,T> query) throws IOException {
+  public long deleteByQuery(Query<K,T> query) {
     try {
       Scanner scanner = createScanner(query);
       // add iterator that drops values on the server side
@@ -613,9 +640,17 @@ public class AccumuloStore<K,T extends P
       return count;
     } catch (TableNotFoundException e) {
       // TODO return 0?
-      throw new IOException(e);
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
+      return 0;
     } catch (MutationsRejectedException e) {
-      throw new IOException(e);
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
+      return 0;
+    } catch (IOException e){
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
+      return 0;
     }
   }
 
@@ -654,14 +689,16 @@ public class AccumuloStore<K,T extends P
   }
 
   @Override
-  public Result<K,T> execute(Query<K,T> query) throws IOException {
+  public Result<K,T> execute(Query<K,T> query) {
     try {
       Scanner scanner = createScanner(query);
       return new AccumuloResult<K,T>(this, query, scanner);
     } catch (TableNotFoundException e) {
       // TODO return empty result?
-      throw new IOException(e);
-    }
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
+      return null;
+    } 
   }
   
   @Override
@@ -817,26 +854,27 @@ public class AccumuloStore<K,T extends P
   }
 
   @Override
-  public void flush() throws IOException {
+  public void flush() {
     try {
       if (batchWriter != null) {
         batchWriter.flush();
       }
     } catch (MutationsRejectedException e) {
-      throw new IOException(e);
-    }
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
+    } 
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     try {
       if (batchWriter != null) {
         batchWriter.close();
         batchWriter = null;
       }
     } catch (MutationsRejectedException e) {
-      throw new IOException(e);
-    }
-    
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
+    } 
   }
 }

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java Sat Nov  3 21:09:11 2012
@@ -21,12 +21,12 @@ package org.apache.gora.cassandra.query;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.QueryBase;
 import org.apache.gora.store.DataStore;
 
-public class CassandraQuery<K, T extends Persistent> extends QueryBase<K, T> {
+public class CassandraQuery<K, T extends PersistentBase> extends QueryBase<K, T> {
 
   private Query<K, T> query;
   

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Sat Nov  3 21:09:11 2012
@@ -27,14 +27,14 @@ import me.prettyprint.cassandra.serializ
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.specific.SpecificFixed;
-import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.ResultBase;
 import org.apache.gora.store.DataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class CassandraResult<K, T extends Persistent> extends ResultBase<K, T> {
+public class CassandraResult<K, T extends PersistentBase> extends ResultBase<K, T> {
   public static final Logger LOG = LoggerFactory.getLogger(CassandraResult.class);
   
   private int rowNumber;
@@ -64,7 +64,6 @@ public class CassandraResult<K, T extend
    * Load key/value pair from Cassandra row to Avro record.
    * @throws IOException
    */
-  @SuppressWarnings("unchecked")
   private void updatePersistent() throws IOException {
     CassandraRow<K> cassandraRow = this.cassandraResultSet.get(this.rowNumber);
     

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Sat Nov  3 21:09:11 2012
@@ -56,6 +56,7 @@ import org.apache.gora.cassandra.seriali
 import org.apache.gora.cassandra.serializers.TypeUtils;
 import org.apache.gora.mapreduce.GoraRecordReader;
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.persistency.State;
 import org.apache.gora.persistency.StatefulHashMap;
 import org.apache.gora.query.Query;
@@ -63,7 +64,7 @@ import org.apache.gora.util.ByteUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class CassandraClient<K, T extends Persistent> {
+public class CassandraClient<K, T extends PersistentBase> {
   public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
   
   private Cluster cluster;

Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
+++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Sat Nov  3 21:09:11 2012
@@ -50,7 +50,6 @@ import org.apache.gora.cassandra.query.C
 import org.apache.gora.cassandra.query.CassandraSubColumn;
 import org.apache.gora.cassandra.query.CassandraSuperColumn;
 import org.apache.gora.persistency.ListGenericArray;
-import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.StatefulHashMap;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.persistency.impl.StateManagerImpl;
@@ -62,7 +61,7 @@ import org.apache.gora.store.impl.DataSt
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class CassandraStore<K, T extends Persistent> extends DataStoreBase<K, T> {
+public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
   public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
 
   private CassandraClient<K, T>  cassandraClient = new CassandraClient<K, T>();
@@ -79,18 +78,18 @@ public class CassandraStore<K, T extends
     // this.cassandraClient.initialize();
   }
 
-  public void initialize(Class<K> keyClass, Class<T> persistent, Properties properties) throws IOException {
-    super.initialize(keyClass, persistent, properties);
+  public void initialize(Class<K> keyClass, Class<T> persistent, Properties properties) {
     try {
+      super.initialize(keyClass, persistent, properties);
       this.cassandraClient.initialize(keyClass, persistent);
-    }
-    catch (Exception e) {
-      throw new IOException(e.getMessage(), e);
+    } catch (Exception e) {
+      LOG.error(e.getMessage());
+      LOG.error(e.getStackTrace().toString());
     }
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     LOG.debug("close");
     flush();
   }
@@ -102,25 +101,25 @@ public class CassandraStore<K, T extends
   }
 
   @Override
-  public boolean delete(K key) throws IOException {
+  public boolean delete(K key) {
     LOG.debug("delete " + key);
     return false;
   }
 
   @Override
-  public long deleteByQuery(Query<K, T> query) throws IOException {
+  public long deleteByQuery(Query<K, T> query) {
     LOG.debug("delete by query " + query);
     return 0;
   }
 
   @Override
-  public void deleteSchema() throws IOException {
+  public void deleteSchema() {
     LOG.debug("delete schema");
     this.cassandraClient.dropKeyspace();
   }
 
   @Override
-  public Result<K, T> execute(Query<K, T> query) throws IOException {
+  public Result<K, T> execute(Query<K, T> query) {
     
     Map<String, List<String>> familyMap = this.cassandraClient.getFamilyMap(query);
     Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query);
@@ -208,7 +207,7 @@ public class CassandraStore<K, T extends
    * @see org.apache.gora.store.DataStore#flush()
    */
   @Override
-  public void flush() throws IOException {
+  public void flush() {
     
     Set<K> keys = this.buffer.keySet();
     
@@ -237,14 +236,20 @@ public class CassandraStore<K, T extends
   }
 
   @Override
-  public T get(K key, String[] fields) throws IOException {
+  public T get(K key, String[] fields) {
     CassandraQuery<K,T> query = new CassandraQuery<K,T>();
     query.setDataStore(this);
     query.setKeyRange(key, key);
     query.setFields(fields);
     query.setLimit(1);
     Result<K,T> result = execute(query);
-    boolean hasResult = result.next();
+    boolean hasResult = false;
+    try {
+      hasResult = result.next();
+    } catch (Exception e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
     return hasResult ? result.get() : null;
   }
 
@@ -263,7 +268,7 @@ public class CassandraStore<K, T extends
    */
   @Override
   public String getSchemaName() {
-	return this.cassandraClient.getKeyspaceName();
+    return this.cassandraClient.getKeyspaceName();
   }
 
   @Override
@@ -278,7 +283,7 @@ public class CassandraStore<K, T extends
    * @see org.apache.gora.store.DataStore#put(java.lang.Object, org.apache.gora.persistency.Persistent)
    */
   @Override
-  public void put(K key, T value) throws IOException {
+  public void put(K key, T value) {
     T p = (T) value.newInstance(new StateManagerImpl());
     Schema schema = value.getSchema();
     for (Field field: schema.getFields()) {
@@ -291,8 +296,8 @@ public class CassandraStore<K, T extends
         Type type = fieldSchema.getType();
         switch(type) {
           case RECORD:
-            Persistent persistent = (Persistent) fieldValue;
-            Persistent newRecord = persistent.newInstance(new StateManagerImpl());
+            PersistentBase persistent = (PersistentBase) fieldValue;
+            PersistentBase newRecord = (PersistentBase) persistent.newInstance(new StateManagerImpl());
             for (Field member: fieldSchema.getFields()) {
               newRecord.put(member.pos(), persistent.get(member.pos()));
             }
@@ -390,7 +395,7 @@ public class CassandraStore<K, T extends
   }
 
   @Override
-  public boolean schemaExists() throws IOException {
+  public boolean schemaExists() {
     LOG.info("schema exists");
     return cassandraClient.keyspaceExists();
   }

Modified: gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java (original)
+++ gora/trunk/gora-core/src/examples/java/org/apache/gora/examples/WebPageDataCreator.java Sat Nov  3 21:09:11 2012
@@ -109,30 +109,35 @@ public class WebPageDataCreator {
   
   public static void createWebPageData(DataStore<String, WebPage> dataStore) 
   throws IOException {
-    WebPage page;
-    log.info("creating web page data");
-    
-    for(int i=0; i<URLS.length; i++) {
-      page = new WebPage();
-      page.setUrl(new Utf8(URLS[i]));
-      page.setContent(ByteBuffer.wrap(CONTENTS[i].getBytes()));
-      for(String token : CONTENTS[i].split(" ")) {
-        page.addToParsedContent(new Utf8(token));  
-      }
-      
-      for(int j=0; j<LINKS[i].length; j++) {
-        page.putToOutlinks(new Utf8(URLS[LINKS[i][j]]), new Utf8(ANCHORS[i][j]));
-      }
-      
-      Metadata metadata = new Metadata();
-      metadata.setVersion(1);
-      metadata.putToData(new Utf8("metakey"), new Utf8("metavalue"));
-      page.setMetadata(metadata);
-      
-      dataStore.put(URLS[i], page);
-    }
-    dataStore.flush();
-    log.info("finished creating web page data");
+	  try{
+	    WebPage page;
+	    log.info("creating web page data");
+	    
+	    for(int i=0; i<URLS.length; i++) {
+	      page = new WebPage();
+	      page.setUrl(new Utf8(URLS[i]));
+	      page.setContent(ByteBuffer.wrap(CONTENTS[i].getBytes()));
+	      for(String token : CONTENTS[i].split(" ")) {
+	        page.addToParsedContent(new Utf8(token));  
+	      }
+	      
+	      for(int j=0; j<LINKS[i].length; j++) {
+	        page.putToOutlinks(new Utf8(URLS[LINKS[i][j]]), new Utf8(ANCHORS[i][j]));
+	      }
+	      
+	      Metadata metadata = new Metadata();
+	      metadata.setVersion(1);
+	      metadata.putToData(new Utf8("metakey"), new Utf8("metavalue"));
+	      page.setMetadata(metadata);
+	      
+	      dataStore.put(URLS[i], page);
+	    }
+	    dataStore.flush();
+	    log.info("finished creating web page data");
+  	}
+ 	catch(Exception e){
+ 		log.info("error creating web page data");
+ 	}
   }
   
   public int run(String[] args) throws Exception {

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/avro/PersistentDatumReader.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/PersistentDatumReader.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/avro/PersistentDatumReader.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/avro/PersistentDatumReader.java Sat Nov  3 21:09:11 2012
@@ -38,13 +38,14 @@ import org.apache.gora.persistency.Persi
 import org.apache.gora.persistency.State;
 import org.apache.gora.persistency.StatefulHashMap;
 import org.apache.gora.persistency.StatefulMap;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.persistency.impl.StateManagerImpl;
 import org.apache.gora.util.IOUtils;
 
 /**
  * PersistentDatumReader reads, fields' dirty and readable information.
  */
-public class PersistentDatumReader<T extends Persistent>
+public class PersistentDatumReader<T extends PersistentBase>
   extends SpecificDatumReader<T> {
 
   private Schema rootSchema;
@@ -212,7 +213,7 @@ public class PersistentDatumReader<T ext
   }
   
   public Persistent clone(Persistent persistent, Schema schema) {
-    Persistent cloned = persistent.newInstance(new StateManagerImpl());
+    Persistent cloned = (PersistentBase)persistent.newInstance(new StateManagerImpl());
     List<Field> fields = schema.getFields();
     for(Field field: fields) {
       int pos = field.pos();
@@ -220,10 +221,10 @@ public class PersistentDatumReader<T ext
         case MAP    :
         case ARRAY  :
         case RECORD : 
-        case STRING : cloned.put(pos, cloneObject(
-            field.schema(), persistent.get(pos), cloned.get(pos))); break;
+        case STRING : ((PersistentBase)cloned).put(pos, cloneObject(
+            field.schema(), ((PersistentBase)persistent).get(pos), ((PersistentBase)cloned).get(pos))); break;
         case NULL   : break;
-        default     : cloned.put(pos, persistent.get(pos)); break;
+        default     : ((PersistentBase)cloned).put(pos, ((PersistentBase)persistent).get(pos)); break;
       }
     }
     

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/avro/PersistentDatumWriter.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/PersistentDatumWriter.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/avro/PersistentDatumWriter.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/avro/PersistentDatumWriter.java Sat Nov  3 21:09:11 2012
@@ -26,16 +26,16 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
-import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.State;
 import org.apache.gora.persistency.StateManager;
 import org.apache.gora.persistency.StatefulMap;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.util.IOUtils;
 
 /**
  * PersistentDatumWriter writes, fields' dirty and readable information.
  */
-public class PersistentDatumWriter<T extends Persistent>
+public class PersistentDatumWriter<T extends PersistentBase>
   extends SpecificDatumWriter<T> {
 
   private T persistent = null;

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/AvroQuery.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/AvroQuery.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/AvroQuery.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/AvroQuery.java Sat Nov  3 21:09:11 2012
@@ -18,8 +18,11 @@
 
 package org.apache.gora.avro.query;
 
+import java.io.IOException;
+
 import org.apache.gora.avro.store.AvroStore;
-import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.Result;
 import org.apache.gora.query.impl.QueryBase;
 
 /**
@@ -27,7 +30,7 @@ import org.apache.gora.query.impl.QueryB
  * most of the operations for Query, like setting start,end keys is not 
  * supported. Setting query limit is supported.
  */
-public class AvroQuery<K, T extends Persistent> extends QueryBase<K,T> {
+public class AvroQuery<K, T extends PersistentBase> extends QueryBase<K,T> {
 
   public AvroQuery() {
     super(null);

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/AvroResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/AvroResult.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/AvroResult.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/AvroResult.java Sat Nov  3 21:09:11 2012
@@ -25,13 +25,13 @@ import org.apache.avro.AvroTypeException
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.gora.avro.store.AvroStore;
-import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.impl.ResultBase;
 
 /**
  * Adapter to convert DatumReader to Result.
  */
-public class AvroResult<K, T extends Persistent> extends ResultBase<K, T> {
+public class AvroResult<K, T extends PersistentBase> extends ResultBase<K, T> {
 
   private DatumReader<T> reader;
   private Decoder decoder;
@@ -43,7 +43,6 @@ public class AvroResult<K, T extends Per
     this.decoder = decoder;
   }
 
-  @Override
   public void close() throws IOException {
   }
 

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/DataFileAvroResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/DataFileAvroResult.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/DataFileAvroResult.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/avro/query/DataFileAvroResult.java Sat Nov  3 21:09:11 2012
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.SeekableInput;
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.ResultBase;
 import org.apache.gora.store.DataStore;
@@ -30,7 +31,7 @@ import org.apache.gora.store.DataStore;
 /**
  * An Avro {@link DataFileReader} backed Result.
  */
-public class DataFileAvroResult<K, T extends Persistent> extends ResultBase<K, T> {
+public class DataFileAvroResult<K, T extends PersistentBase> extends ResultBase<K, T> {
 
   private SeekableInput in;
   private DataFileReader<T> reader;
@@ -58,9 +59,9 @@ public class DataFileAvroResult<K, T ext
 
   @Override
   public void close() throws IOException {
-    if(reader != null)
-      reader.close();
-    reader = null;
+	  if(reader != null)
+		  reader.close();
+	  reader = null;
   }
 
   @Override

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java Sat Nov  3 21:09:11 2012
@@ -35,7 +35,7 @@ import org.apache.avro.specific.Specific
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.gora.avro.query.AvroQuery;
 import org.apache.gora.avro.query.AvroResult;
-import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.query.impl.FileSplitPartitionQuery;
@@ -45,17 +45,22 @@ import org.apache.gora.util.OperationNot
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * An adapter DataStore for binary-compatible Avro serializations.
  * AvroDataStore supports Binary and JSON serializations.
  * @param <T>
  */
-public class AvroStore<K, T extends Persistent>
+public class AvroStore<K, T extends PersistentBase>
   extends FileBackedDataStoreBase<K, T> implements Configurable {
 
   /** The property key specifying avro encoder/decoder type to use. Can take values
    * "BINARY" or "JSON". */
   public static final String CODEC_TYPE_KEY = "codec.type";
+  
+  public static final Logger LOG = LoggerFactory.getLogger(AvroStore.class);
 
   /**
    * The type of the avro Encoder/Decoder.
@@ -76,16 +81,16 @@ public class AvroStore<K, T extends Pers
 
   @Override
   public void initialize(Class<K> keyClass, Class<T> persistentClass,
-      Properties properties) throws IOException {
-    super.initialize(keyClass, persistentClass, properties);
-
-    if(properties != null) {
-      if(this.codecType == null) {
-        String codecType = DataStoreFactory.findProperty(
-            properties, this, CODEC_TYPE_KEY, "BINARY");
-        this.codecType = CodecType.valueOf(codecType);
+      Properties properties) {
+      super.initialize(keyClass, persistentClass, properties);
+  
+      if(properties != null) {
+        if(this.codecType == null) {
+          String codecType = DataStoreFactory.findProperty(
+              properties, this, CODEC_TYPE_KEY, "BINARY");
+          this.codecType = CodecType.valueOf(codecType);
+        }
       }
-    }
   }
 
   public void setCodecType(CodecType codecType) {
@@ -109,22 +114,27 @@ public class AvroStore<K, T extends Pers
   }
 
   @Override
-  public void close() throws IOException {
-    super.close();
-    if(encoder != null) {
-      encoder.flush();
+  public void close() {
+    try{
+      super.close();
+      if(encoder != null) {
+        encoder.flush();
+      }
+      encoder = null;
+      decoder = null;
+    }catch(IOException ex){
+      LOG.error(ex.getMessage());
+      LOG.error(ex.getStackTrace().toString());
     }
-    encoder = null;
-    decoder = null;
   }
 
   @Override
-  public boolean delete(K key) throws IOException {
+  public boolean delete(K key) {
     throw new OperationNotSupportedException("delete is not supported for AvroStore");
   }
 
   @Override
-  public long deleteByQuery(Query<K, T> query) throws IOException {
+  public long deleteByQuery(Query<K, T> query) {
     throw new OperationNotSupportedException("delete is not supported for AvroStore");
   }
 
@@ -148,14 +158,19 @@ public class AvroStore<K, T extends Pers
   }
 
   @Override
-  public void flush() throws IOException {
-    super.flush();
-    if(encoder != null)
-      encoder.flush();
+  public void flush() {
+    try{
+      super.flush();
+      if(encoder != null)
+        encoder.flush();
+    }catch(IOException ex){
+      LOG.error(ex.getMessage());
+      LOG.error(ex.getStackTrace().toString());
+    }
   }
 
   @Override
-  public T get(K key, String[] fields) throws IOException {
+  public T get(K key, String[] fields) {
     throw new OperationNotSupportedException();
   }
 
@@ -165,8 +180,13 @@ public class AvroStore<K, T extends Pers
   }
 
   @Override
-  public void put(K key, T obj) throws IOException {
-    getDatumWriter().write(obj, getEncoder());
+  public void put(K key, T obj) {
+    try{
+      getDatumWriter().write(obj, getEncoder());
+    }catch(IOException ex){
+      LOG.error(ex.getMessage());
+      LOG.error(ex.getStackTrace().toString());
+    }
   }
 
   public Encoder getEncoder() throws IOException {
@@ -235,12 +255,12 @@ public class AvroStore<K, T extends Pers
   }
 
   @Override
-  public void write(DataOutput out) throws IOException {
+  public void write(DataOutput out) {
     super.write(out);
   }
 
   @Override
-  public void readFields(DataInput in) throws IOException {
+  public void readFields(DataInput in) {
     super.readFields(in);
   }
 

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java Sat Nov  3 21:09:11 2012
@@ -25,33 +25,44 @@ import org.apache.avro.file.DataFileWrit
 import org.apache.gora.avro.mapreduce.FsInput;
 import org.apache.gora.avro.query.DataFileAvroResult;
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.query.impl.FileSplitPartitionQuery;
 import org.apache.gora.util.OperationNotSupportedException;
 import org.apache.hadoop.fs.Path;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * DataFileAvroStore is file based store which uses Avro's 
  * DataFile{Writer,Reader}'s as a backend. This datastore supports 
  * mapreduce.
  */
-public class DataFileAvroStore<K, T extends Persistent> extends AvroStore<K, T> {
+public class DataFileAvroStore<K, T extends PersistentBase> extends AvroStore<K, T> {
 
+  public static final Logger LOG = LoggerFactory.getLogger(AvroStore.class);
+  
   public DataFileAvroStore() {
   }
   
   private DataFileWriter<T> writer;
   
   @Override
-  public T get(K key, String[] fields) throws java.io.IOException {
+  public T get(K key, String[] fields) {
     throw new OperationNotSupportedException(
         "Avro DataFile's does not support indexed retrieval");
   };
   
   @Override
-  public void put(K key, T obj) throws java.io.IOException {
-    getWriter().append(obj);
+  public void put(K key, T obj) {
+    try{
+      getWriter().append(obj);
+    } catch(IOException ex){
+      LOG.error(ex.getMessage());
+      LOG.error(ex.getStackTrace().toString());
+    }
   };
   
   private DataFileWriter<T> getWriter() throws IOException {
@@ -63,18 +74,29 @@ public class DataFileAvroStore<K, T exte
   }
   
   @Override
-  protected Result<K, T> executeQuery(Query<K, T> query) throws IOException {
-    return new DataFileAvroResult<K, T>(this, query
-        , createReader(createFsInput()));
+  protected Result<K, T> executeQuery(Query<K, T> query) {
+    try{
+      return new DataFileAvroResult<K, T>(this, query
+          , createReader(createFsInput()));
+    } catch(IOException ex){
+      LOG.error(ex.getMessage());
+      LOG.error(ex.getStackTrace().toString());
+      return null;
+    }
   }
  
   @Override
-  protected Result<K,T> executePartial(FileSplitPartitionQuery<K,T> query) 
-    throws IOException {
-    FsInput fsInput = createFsInput();
-    DataFileReader<T> reader = createReader(fsInput);
-    return new DataFileAvroResult<K, T>(this, query, reader, fsInput
-        , query.getStart(), query.getLength());
+  protected Result<K,T> executePartial(FileSplitPartitionQuery<K,T> query) {
+    try{
+      FsInput fsInput = createFsInput();
+      DataFileReader<T> reader = createReader(fsInput);
+      return new DataFileAvroResult<K, T>(this, query, reader, fsInput
+          , query.getStart(), query.getLength());
+    } catch(IOException ex){
+      LOG.error(ex.getMessage());
+      LOG.error(ex.getStackTrace().toString());
+      return null;
+    }
   }
   
   private DataFileReader<T> createReader(FsInput fsInput) throws IOException {
@@ -87,19 +109,29 @@ public class DataFileAvroStore<K, T exte
   }
   
   @Override
-  public void flush() throws IOException {
-    super.flush();
-    if(writer != null) {
-      writer.flush();
+  public void flush() {
+    try{
+      super.flush();
+      if(writer != null) {
+        writer.flush();
+      }
+    } catch(IOException ex){
+      LOG.error(ex.getMessage());
+      LOG.error(ex.getStackTrace().toString());
     }
   }
   
   @Override
-  public void close() throws IOException {
-    if(writer != null)  
-      writer.close(); //hadoop 0.20.2 HDFS streams do not allow 
-                      //to close twice, so close the writer first 
-    writer = null;
-    super.close();
+  public void close() {
+    try{
+      if(writer != null)  
+        writer.close(); //hadoop 0.20.2 HDFS streams do not allow 
+                        //to close twice, so close the writer first 
+      writer = null;
+      super.close();
+    } catch(IOException ex){
+      LOG.error(ex.getMessage());
+      LOG.error(ex.getStackTrace().toString());
+    }
   }
 }

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraInputFormat.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraInputFormat.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraInputFormat.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraInputFormat.java Sat Nov  3 21:09:11 2012
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.FileSplitPartitionQuery;
@@ -52,7 +53,7 @@ import org.apache.hadoop.mapreduce.lib.i
  * 
  * @see GoraMapper
  */
-public class GoraInputFormat<K, T extends Persistent>
+public class GoraInputFormat<K, T extends PersistentBase>
   extends InputFormat<K, T> implements Configurable {
 
   public static final String QUERY_KEY   = "gora.inputformat.query";

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java Sat Nov  3 21:09:11 2012
@@ -21,8 +21,10 @@ package org.apache.gora.mapreduce;
 import java.io.IOException;
 
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.ResultBase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -33,7 +35,7 @@ import org.slf4j.LoggerFactory;
 /**
  * An adapter for Result to Hadoop RecordReader.
  */
-public class GoraRecordReader<K, T extends Persistent> extends RecordReader<K,T> {
+public class GoraRecordReader<K, T extends PersistentBase> extends RecordReader<K,T> {
   public static final Logger LOG = LoggerFactory.getLogger(GoraRecordReader.class);
 
   public static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
@@ -62,7 +64,7 @@ public class GoraRecordReader<K, T exten
     this.query.setLimit(recordsMax);
   }
 
-  public void executeQuery() throws IOException {
+  public void executeQuery() throws IOException, Exception {
     this.result = query.execute();
   }
   
@@ -78,7 +80,12 @@ public class GoraRecordReader<K, T exten
 
   @Override
   public float getProgress() throws IOException, InterruptedException {
-    return result.getProgress();
+    try{
+	  return result.getProgress();
+  	}
+ 	catch(Exception e){
+ 		return 0;
+ 	}
   }
 
   @Override
@@ -87,31 +94,36 @@ public class GoraRecordReader<K, T exten
 
   @Override
   public boolean nextKeyValue() throws IOException, InterruptedException {
-    if (counter.isModulo()) {
-      boolean firstBatch = (this.result == null);
-      if (! firstBatch) {
-        this.query.setStartKey(this.result.getKey());
-        if (this.query.getLimit() == counter.getRecordsMax()) {
-          this.query.setLimit(counter.getRecordsMax() + 1);
-        }
-      }
-      if (this.result != null) {
-        this.result.close();
-      }
-      
-      executeQuery();
-      
-      if (! firstBatch) {
-        // skip first result
-        this.result.next();
-      }
-    }
-    
-    counter.increment();
-    return this.result.next();
+	  try{
+	    if (counter.isModulo()) {
+	      boolean firstBatch = (this.result == null);
+	      if (! firstBatch) {
+	        this.query.setStartKey(this.result.getKey());
+	        if (this.query.getLimit() == counter.getRecordsMax()) {
+	          this.query.setLimit(counter.getRecordsMax() + 1);
+	        }
+	      }
+	      if (this.result != null) {
+	        this.result.close();
+	      }
+	      
+	      executeQuery();
+	      
+	      if (! firstBatch) {
+	        // skip first result
+	        this.result.next();
+	      }
+	    }
+	    
+	    counter.increment();
+	    return this.result.next();
+	  }
+	  catch(Exception e){
+		return false;
+	  }
   }
 
-  @Override
+  //@Override
   public void close() throws IOException {
     if (result != null) {
       result.close();

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java Sat Nov  3 21:09:11 2012
@@ -52,17 +52,25 @@ public class GoraRecordWriter<K, T> exte
   @Override
   public void close(TaskAttemptContext context) throws IOException,
       InterruptedException {
-    store.close();
+	  try{
+		  store.close();
+	  }catch(Exception e){
+		  LOG.info("Exception at GoraRecordWriter.class while closing datastore." + e.getMessage());
+	  }
   }
 
   @Override
   public void write(K key, T value) throws IOException, InterruptedException {
-    store.put(key, (Persistent) value);
-    
-    counter.increment();
-    if (counter.isModulo()) {
-      LOG.info("Flushing the datastore after " + counter.getRecordsNumber() + " records");
-      store.flush();
-    }
+	  try{
+	    store.put(key, (Persistent) value);
+	    
+	    counter.increment();
+	    if (counter.isModulo()) {
+	      LOG.info("Flushing the datastore after " + counter.getRecordsNumber() + " records");
+	      store.flush();
+	    }
+	  }catch(Exception e){
+		  LOG.info("Exception at GoraRecordWriter.class while writing to datastore." + e.getMessage());
+	  }
   }
 }

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentDeserializer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentDeserializer.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentDeserializer.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentDeserializer.java Sat Nov  3 21:09:11 2012
@@ -25,6 +25,7 @@ import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.gora.avro.PersistentDatumReader;
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.util.AvroUtils;
 import org.apache.hadoop.io.serializer.Deserializer;
 
@@ -33,26 +34,26 @@ import org.apache.hadoop.io.serializer.D
 * with {@link BinaryDecoder}.
 */
 public class PersistentDeserializer
-   implements Deserializer<Persistent> {
+   implements Deserializer<PersistentBase> {
 
   private BinaryDecoder decoder;
-  private Class<? extends Persistent> persistentClass;
+  private Class<? extends PersistentBase> persistentClass;
   private boolean reuseObjects;
-  private PersistentDatumReader<Persistent> datumReader;
+  private PersistentDatumReader<PersistentBase> datumReader;
 
-  public PersistentDeserializer(Class<? extends Persistent> c, boolean reuseObjects) {
+  public PersistentDeserializer(Class<? extends PersistentBase> c, boolean reuseObjects) {
     this.persistentClass = c;
     this.reuseObjects = reuseObjects;
     try {
       Schema schema = AvroUtils.getSchema(persistentClass);
-      datumReader = new PersistentDatumReader<Persistent>(schema, true);
+      datumReader = new PersistentDatumReader<PersistentBase>(schema, true);
 
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
   }
 
-  @Override
+  //@Override
   public void open(InputStream in) throws IOException {
     /* It is very important to use a direct buffer, since Hadoop
      * supplies an input stream that is only valid until the end of one
@@ -64,11 +65,11 @@ public class PersistentDeserializer
       .createBinaryDecoder(in, decoder);
   }
 
-  @Override
+  //@Override
   public void close() throws IOException { }
 
   @Override
-  public Persistent deserialize(Persistent persistent) throws IOException {
+  public PersistentBase deserialize(PersistentBase persistent) throws IOException {
     return datumReader.read(reuseObjects ? persistent : null, decoder);
   }
 }

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentNonReusingSerialization.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentNonReusingSerialization.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentNonReusingSerialization.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentNonReusingSerialization.java Sat Nov  3 21:09:11 2012
@@ -18,12 +18,13 @@
 package org.apache.gora.mapreduce;
 
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.Serializer;
 
 public class PersistentNonReusingSerialization
-implements Serialization<Persistent> {
+implements Serialization<PersistentBase> {
 
   @Override
   public boolean accept(Class<?> c) {
@@ -31,12 +32,12 @@ implements Serialization<Persistent> {
   }
 
   @Override
-  public Deserializer<Persistent> getDeserializer(Class<Persistent> c) {
+  public Deserializer<PersistentBase> getDeserializer(Class<PersistentBase> c) {
     return new PersistentDeserializer(c, false);
   }
 
   @Override
-  public Serializer<Persistent> getSerializer(Class<Persistent> c) {
+  public Serializer<PersistentBase> getSerializer(Class<PersistentBase> c) {
     return new PersistentSerializer();
   }
 }

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerialization.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerialization.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerialization.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerialization.java Sat Nov  3 21:09:11 2012
@@ -17,13 +17,14 @@
  */
 package org.apache.gora.mapreduce;
 
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.persistency.Persistent;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.Serializer;
 
 public class PersistentSerialization
-implements Serialization<Persistent> {
+implements Serialization<PersistentBase> {
 
   @Override
   public boolean accept(Class<?> c) {
@@ -31,12 +32,12 @@ implements Serialization<Persistent> {
   }
 
   @Override
-  public Deserializer<Persistent> getDeserializer(Class<Persistent> c) {
+  public Deserializer<PersistentBase> getDeserializer(Class<PersistentBase> c) {
     return new PersistentDeserializer(c, true);
   }
 
   @Override
-  public Serializer<Persistent> getSerializer(Class<Persistent> c) {
+  public Serializer<PersistentBase> getSerializer(Class<PersistentBase> c) {
     return new PersistentSerializer();
   }
 }

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerializer.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerializer.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerializer.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerializer.java Sat Nov  3 21:09:11 2012
@@ -22,34 +22,34 @@ import java.io.OutputStream;
 
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.gora.avro.PersistentDatumWriter;
-import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.hadoop.io.serializer.Serializer;
 
 /**
  * Hadoop serializer using {@link PersistentDatumWriter} 
  * with {@link BinaryEncoder}. 
  */
-public class PersistentSerializer implements Serializer<Persistent> {
+public class PersistentSerializer implements Serializer<PersistentBase> {
 
-  private PersistentDatumWriter<Persistent> datumWriter;
+  private PersistentDatumWriter<PersistentBase> datumWriter;
   private BinaryEncoder encoder;  
   
   public PersistentSerializer() {
-    this.datumWriter = new PersistentDatumWriter<Persistent>();
+    this.datumWriter = new PersistentDatumWriter<PersistentBase>();
   }
   
-  @Override
+  //@Override
   public void close() throws IOException {
     encoder.flush();
   }
 
-  @Override
+  //@Override
   public void open(OutputStream out) throws IOException {
     encoder = new BinaryEncoder(out);
   }
 
   @Override
-  public void serialize(Persistent persistent) throws IOException {   
+  public void serialize(PersistentBase persistent) throws IOException {   
     datumWriter.setSchema(persistent.getSchema());
     datumWriter.setPersistent(persistent);
         

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java Sat Nov  3 21:09:11 2012
@@ -27,6 +27,7 @@ import java.util.NavigableMap;
 import java.util.TreeMap;
 
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.persistency.impl.StateManagerImpl;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
@@ -40,9 +41,9 @@ import org.apache.gora.store.impl.DataSt
 /**
  * Memory based {@link DataStore} implementation for tests.
  */
-public class MemStore<K, T extends Persistent> extends DataStoreBase<K, T> {
+public class MemStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
 
-  public static class MemQuery<K, T extends Persistent> extends QueryBase<K, T> {
+  public static class MemQuery<K, T extends PersistentBase> extends QueryBase<K, T> {
     public MemQuery() {
       super(null);
     }
@@ -51,7 +52,7 @@ public class MemStore<K, T extends Persi
     }
   }
 
-  public static class MemResult<K, T extends Persistent> extends ResultBase<K, T> {
+  public static class MemResult<K, T extends PersistentBase> extends ResultBase<K, T> {
     private NavigableMap<K, T> map;
     private Iterator<K> iterator;
     public MemResult(DataStore<K, T> dataStore, Query<K, T> query
@@ -60,8 +61,9 @@ public class MemStore<K, T extends Persi
       this.map = map;
       iterator = map.navigableKeySet().iterator();
     }
-    @Override
-    public void close() throws IOException { }
+    //@Override
+    public void close() { }
+    
     @Override
     public float getProgress() throws IOException {
       return 0;
@@ -91,25 +93,29 @@ public class MemStore<K, T extends Persi
   }
 
   @Override
-  public boolean delete(K key) throws IOException {
+  public boolean delete(K key) {
     return map.remove(key) != null;
   }
 
   @Override
-  public long deleteByQuery(Query<K, T> query) throws IOException {
-    long deletedRows = 0;
-    Result<K,T> result = query.execute();
-
-    while(result.next()) {
-      if(delete(result.getKey()))
-        deletedRows++;
-    }
-
-    return 0;
+  public long deleteByQuery(Query<K, T> query) {
+	try{
+		long deletedRows = 0;
+	    Result<K,T> result = query.execute();
+	
+	    while(result.next()) {
+	      if(delete(result.getKey()))
+	        deletedRows++;
+	    }
+	    return 0;
+	  }
+	catch(Exception e){
+		  return 0;
+	}
   }
 
   @Override
-  public Result<K, T> execute(Query<K, T> query) throws IOException {
+  public Result<K, T> execute(Query<K, T> query) {
     K startKey = query.getStartKey();
     K endKey = query.getEndKey();
     if(startKey == null) {
@@ -128,7 +134,7 @@ public class MemStore<K, T extends Persi
   }
 
   @Override
-  public T get(K key, String[] fields) throws IOException {
+  public T get(K key, String[] fields) {
     T obj = map.get(key);
     return getPersistent(obj, getFieldsToQuery(fields));
   }
@@ -144,7 +150,7 @@ public class MemStore<K, T extends Persi
     T newObj = (T) obj.newInstance(new StateManagerImpl());
     for(String field:fields) {
       int index = newObj.getFieldIndex(field);
-      newObj.put(index, obj.get(index));
+      ((PersistentBase)newObj).put(index, ((PersistentBase)obj).get(index));
     }
     return newObj;
   }
@@ -155,7 +161,7 @@ public class MemStore<K, T extends Persi
   }
 
   @Override
-  public void put(K key, T obj) throws IOException {
+  public void put(K key, T obj) {
     map.put(key, obj);
   }
 
@@ -163,31 +169,30 @@ public class MemStore<K, T extends Persi
   /**
    * Returns a single partition containing the original query
    */
-  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
-      throws IOException {
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query){
     List<PartitionQuery<K, T>> list = new ArrayList<PartitionQuery<K,T>>();
     list.add(new PartitionQueryImpl<K, T>(query));
     return list;
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     map.clear();
   }
 
   @Override
-  public void createSchema() throws IOException { }
+  public void createSchema() { }
 
   @Override
-  public void deleteSchema() throws IOException {
+  public void deleteSchema() {
     map.clear();
   }
 
   @Override
-  public boolean schemaExists() throws IOException {
+  public boolean schemaExists() {
     return true;
   }
 
   @Override
-  public void flush() throws IOException { }
+  public void flush() { }
 }

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/BeanFactory.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/BeanFactory.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/BeanFactory.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/BeanFactory.java Sat Nov  3 21:09:11 2012
@@ -21,7 +21,7 @@ package org.apache.gora.persistency;
 /**
  * BeanFactory's enable contruction of keys and Persistent objects. 
  */
-public interface BeanFactory<K, T extends Persistent> {
+public interface BeanFactory<K, T>{
 
   /**
    * Constructs a new instance of the key class

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/Persistent.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/Persistent.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/Persistent.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/Persistent.java Sat Nov  3 21:09:11 2012
@@ -17,12 +17,10 @@
  */
 package org.apache.gora.persistency;
 
-import org.apache.avro.specific.SpecificRecord;
-
 /**
  * Objects that are persisted by Gora implements this interface.
  */
-public interface Persistent extends SpecificRecord, Cloneable {
+public interface Persistent extends Cloneable{
 
   /**
    * Returns the StateManager which manages the persistent 
@@ -183,4 +181,5 @@ public interface Persistent extends Spec
   void clearReadable();
   
   Persistent clone();
+
 }

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/StateManager.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/StateManager.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/StateManager.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/StateManager.java Sat Nov  3 21:09:11 2012
@@ -21,7 +21,7 @@ package org.apache.gora.persistency;
 /**
  * StateManager manages objects state for persistency.
  */
-public interface StateManager {
+public interface StateManager{
 
   /**
    * If one state manager is allocated per persistent object, 

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/PersistentBase.java Sat Nov  3 21:09:11 2012
@@ -35,7 +35,7 @@ import org.apache.gora.persistency.State
  * Base classs implementing common functionality for Persistent
  * classes.
  */
-public abstract class PersistentBase implements Persistent {
+public abstract class PersistentBase implements Persistent, SpecificRecord {
 
   protected static Map<Class<?>, Map<String, Integer>> FIELD_MAP =
     new HashMap<Class<?>, Map<String,Integer>>();
@@ -43,8 +43,8 @@ public abstract class PersistentBase imp
   protected static Map<Class<?>, String[]> FIELDS =
     new HashMap<Class<?>, String[]>();
 
-  protected static final PersistentDatumReader<Persistent> datumReader =
-    new PersistentDatumReader<Persistent>();
+  protected static final PersistentDatumReader<PersistentBase> datumReader =
+    new PersistentDatumReader<PersistentBase>();
     
   private StateManager stateManager;
 
@@ -230,7 +230,7 @@ public abstract class PersistentBase imp
     clearReadable(getFieldIndex(field));
   }
 
-  @Override
+  //@Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (!(o instanceof SpecificRecord)) return false;
@@ -241,7 +241,7 @@ public abstract class PersistentBase imp
     return this.hashCode() == r2.hashCode();
   }
 
-  @Override
+  //@Override
   public int hashCode() {
     final int prime = 31;
     int result = 1;
@@ -280,7 +280,7 @@ public abstract class PersistentBase imp
     return datumReader.clone(this, getSchema());
   }
   
-  @Override
+  //@Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
     builder.append(super.toString());

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/StateManagerImpl.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/StateManagerImpl.java?rev=1405417&r1=1405416&r2=1405417&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/StateManagerImpl.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/impl/StateManagerImpl.java Sat Nov  3 21:09:11 2012
@@ -38,8 +38,8 @@ public class StateManagerImpl implements
   }
 
   public void setManagedPersistent(Persistent persistent) {
-    dirtyBits = new BitSet(persistent.getSchema().getFields().size());
-    readableBits = new BitSet(persistent.getSchema().getFields().size());
+    dirtyBits = new BitSet(((PersistentBase)persistent).getSchema().getFields().size());
+    readableBits = new BitSet(((PersistentBase)persistent).getSchema().getFields().size());
     isNew = true;
   }
 

Added: gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/ws/impl/BeanFactoryWSImpl.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/ws/impl/BeanFactoryWSImpl.java?rev=1405417&view=auto
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/ws/impl/BeanFactoryWSImpl.java (added)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/ws/impl/BeanFactoryWSImpl.java Sat Nov  3 21:09:11 2012
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.persistency.ws.impl;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.gora.persistency.BeanFactory;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.util.ReflectionUtils;
+
+/**
+ * A default implementation of the {@link BeanFactory} interface. Constructs 
+ * the keys using by reflection, {@link Persistent} objects by calling 
+ * {@link Persistent#newInstance(org.apache.gora.persistency.StateManager)}. 
+ */
+public class BeanFactoryWSImpl<K, T extends Persistent> implements BeanFactory<K, T> {
+
+  /**
+   * Class of the key to be used
+   */
+  private Class<K> keyClass;
+  
+  /**
+   * Class of the persistent objects to be stored
+   */
+  private Class<T> persistentClass;
+  
+  /**
+   * Constructor of the key
+   */
+  private Constructor<K> keyConstructor;
+  
+  /**
+   * Object's key
+   */
+  private K key;
+  
+  /**
+   * Persistent object of class T
+   */
+  private T persistent;
+  
+  /**
+   * Flag to be used to determine if a key is persistent or not
+   */
+  private boolean isKeyPersistent = false;
+  
+  /**
+   * Constructor
+   * @param keyClass
+   * @param persistentClass
+   */
+  public BeanFactoryWSImpl(Class<K> keyClass, Class<T> persistentClass) {
+    this.keyClass = keyClass;
+    this.persistentClass = persistentClass;
+    
+    try {
+      if(ReflectionUtils.hasConstructor(keyClass)) {
+        this.keyConstructor = ReflectionUtils.getConstructor(keyClass);
+        this.key = keyConstructor.newInstance(ReflectionUtils.EMPTY_OBJECT_ARRAY);
+      }
+      this.persistent = ReflectionUtils.newInstance(persistentClass);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    
+    isKeyPersistent = Persistent.class.isAssignableFrom(keyClass);
+  }
+  
+  @Override
+  @SuppressWarnings("unchecked")
+  /**
+   * Creates a new key
+   */
+  public K newKey() throws Exception {
+    // TODO this method should be checked to see how object states will be managed
+    if(isKeyPersistent)
+      return (K)((Persistent)key).newInstance(new StateManagerWSImpl());
+    else if(keyConstructor == null) {
+      throw new RuntimeException("Key class does not have a no-arg constructor");
+    }
+    else
+      return keyConstructor.newInstance(ReflectionUtils.EMPTY_OBJECT_ARRAY);
+  }
+ 
+  @SuppressWarnings("unchecked")
+  @Override
+  /**
+   * Creates a new persistent object
+   */
+  public T newPersistent() {
+    return (T) persistent.newInstance(new StateManagerWSImpl());
+  }
+  
+  @Override
+  /**
+   * Gets a cached key
+   */
+  public K getCachedKey() {
+    return key;
+  }
+  
+  @Override
+  /**
+   * Gets a cached persistent object
+   */
+  public T getCachedPersistent() {
+    return persistent;
+  }
+  
+  @Override
+  /**
+   * Gets the key class
+   */
+  public Class<K> getKeyClass() {
+    return keyClass;
+  }
+  
+  @Override
+  /**
+   * Gets the persistent object class
+   */
+  public Class<T> getPersistentClass() {
+    return persistentClass;
+  }
+  
+  /**
+   * Returns if a key is an object of a persistent class
+   * @return True if it is or false if it is not
+   */
+  public boolean isKeyPersistent() {
+    return isKeyPersistent;
+  }
+}



Mime
View raw message