Return-Path: X-Original-To: apmail-gora-commits-archive@www.apache.org Delivered-To: apmail-gora-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4BAE7DE2D for ; Wed, 31 Oct 2012 05:54:41 +0000 (UTC) Received: (qmail 22779 invoked by uid 500); 31 Oct 2012 05:54:41 -0000 Delivered-To: apmail-gora-commits-archive@gora.apache.org Received: (qmail 22732 invoked by uid 500); 31 Oct 2012 05:54:40 -0000 Mailing-List: contact commits-help@gora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gora.apache.org Delivered-To: mailing list commits@gora.apache.org Received: (qmail 22725 invoked by uid 99); 31 Oct 2012 05:54:40 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 Oct 2012 05:54:40 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 Oct 2012 05:54:31 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A6CF923888FE; Wed, 31 Oct 2012 05:53:45 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1403993 [1/2] - in /gora/branches/goraamazon: gora-accumulo/src/main/java/org/apache/gora/accumulo/store/ gora-cassandra/src/main/java/org/apache/gora/cassandra/store/ gora-core/src/main/java/org/apache/gora/avro/store/ gora-core/src/main/... Date: Wed, 31 Oct 2012 05:53:44 -0000 To: commits@gora.apache.org From: rmarroquin@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121031055345.A6CF923888FE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rmarroquin Date: Wed Oct 31 05:53:43 2012 New Revision: 1403993 URL: http://svn.apache.org/viewvc?rev=1403993&view=rev Log: Committing new patch for changes in the way exception were being handled. Modified: gora/branches/goraamazon/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java gora/branches/goraamazon/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/query/impl/FileSplitPartitionQuery.java gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStore.java gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSBackedDataStoreBase.java gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreBase.java gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreFactory.java gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/util/IOUtils.java gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/mock/store/MockDataStore.java gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java gora/branches/goraamazon/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java gora/branches/goraamazon/gora-sql/src/main/java/org/apache/gora/sql/store/SqlStore.java Modified: gora/branches/goraamazon/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java?rev=1403993&r1=1403992&r2=1403993&view=diff ============================================================================== --- gora/branches/goraamazon/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java (original) +++ gora/branches/goraamazon/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java Wed Oct 31 05:53:43 2012 @@ -97,6 +97,8 @@ import org.apache.hadoop.io.Text; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.NodeList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @@ -116,6 +118,8 @@ public class AccumuloStore keyClass, Class persistentClass, Properties properties) throws IOException { - super.initialize(keyClass, persistentClass, properties); - - String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null); - String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE); - String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null); - String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null); - - mapping = readMapping(mappingFile); - - if (mapping.encoder == null || mapping.encoder.equals("")) { - encoder = new org.apache.gora.accumulo.encoders.BinaryEncoder(); - } else { + public void initialize(Class keyClass, Class persistentClass, Properties properties) { + try{ + super.initialize(keyClass, persistentClass, properties); + + String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null); + String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE); + String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null); + String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null); + + mapping = readMapping(mappingFile); + + if (mapping.encoder == null || mapping.encoder.equals("")) { + encoder = new org.apache.gora.accumulo.encoders.BinaryEncoder(); + } else { + try { + encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance(); + } catch (InstantiationException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + try { - encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance(); - } catch (InstantiationException e) { - throw new IOException(e); - } catch (IllegalAccessException e) { + if (mock == null || !mock.equals("true")) { + String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null); + String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null); + conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, password); + authInfo = new AuthInfo(user, ByteBuffer.wrap(password.getBytes()), conn.getInstance().getInstanceID()); + } else { + conn = new MockInstance().getConnector(user, password); + } + + if (autoCreateSchema) + createSchema(); + } catch (AccumuloException e) { throw new IOException(e); - } catch (ClassNotFoundException e) { + } catch (AccumuloSecurityException e) { throw new IOException(e); } - } - - try { - if (mock == null || !mock.equals("true")) { - String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null); - String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null); - conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, password); - authInfo = new AuthInfo(user, ByteBuffer.wrap(password.getBytes()), conn.getInstance().getInstanceID()); - } else { - conn = new MockInstance().getConnector(user, password); - } - - if (autoCreateSchema) - createSchema(); - } catch (AccumuloException e) { - throw new IOException(e); - } catch (AccumuloSecurityException e) { - throw new IOException(e); + }catch(IOException e){ + LOG.error(e.getMessage()); + LOG.error(e.getStackTrace().toString()); } } @@ -341,7 +350,7 @@ public class AccumuloStore> es = mapping.tableConfig.entrySet(); @@ -350,32 +359,38 @@ public class AccumuloStore iter = schema.getFields().iterator(); - - int count = 0; - for (int i = 0; iter.hasNext(); i++) { - Field field = iter.next(); - if (!stateManager.isDirty(val, i)) { - continue; - } + try{ + Mutation m = new Mutation(new Text(toBytes(key))); - Object o = val.get(i); - Pair col = mapping.fieldMap.get(field.name()); - - switch (field.schema().getType()) { - case MAP: - if (o instanceof StatefulMap) { - StatefulMap map = (StatefulMap) o; - Set es = map.states().entrySet(); - for (Object entry : es) { - Object mapKey = ((Entry) entry).getKey(); - State state = (State) ((Entry) entry).getValue(); - - switch (state) { - case NEW: - case DIRTY: - m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(map.get(mapKey)))); - count++; - break; - case DELETED: - m.putDelete(col.getFirst(), new Text(toBytes(mapKey))); - count++; - break; + Schema schema = val.getSchema(); + StateManager stateManager = val.getStateManager(); + + Iterator iter = schema.getFields().iterator(); + + int count = 0; + for (int i = 0; iter.hasNext(); i++) { + Field field = iter.next(); + if (!stateManager.isDirty(val, i)) { + continue; + } + + Object o = val.get(i); + Pair col = mapping.fieldMap.get(field.name()); + + switch (field.schema().getType()) { + case MAP: + if (o instanceof StatefulMap) { + StatefulMap map = (StatefulMap) o; + Set es = map.states().entrySet(); + for (Object entry : es) { + Object mapKey = ((Entry) entry).getKey(); + State state = (State) ((Entry) entry).getValue(); + + switch (state) { + case NEW: + case DIRTY: + m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(map.get(mapKey)))); + count++; + break; + case DELETED: + m.putDelete(col.getFirst(), new Text(toBytes(mapKey))); + count++; + break; + } + + } + } else { + Map map = (Map) o; + Set es = map.entrySet(); + for (Object entry : es) { + Object mapKey = ((Entry) entry).getKey(); + Object mapVal = ((Entry) entry).getValue(); + m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(mapVal))); + count++; } - } - } else { - Map map = (Map) o; - Set es = map.entrySet(); - for (Object entry : es) { - Object mapKey = ((Entry) entry).getKey(); - Object mapVal = ((Entry) entry).getValue(); - m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(mapVal))); + break; + case ARRAY: + GenericArray array = (GenericArray) o; + int j = 0; + for (Object item : array) { + m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item))); count++; } - } - break; - case ARRAY: - GenericArray array = (GenericArray) o; - int j = 0; - for (Object item : array) { - m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item))); + break; + case RECORD: + SpecificDatumWriter writer = new SpecificDatumWriter(field.schema()); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + BinaryEncoder encoder = new BinaryEncoder(os); + writer.write(o, encoder); + encoder.flush(); + m.put(col.getFirst(), col.getSecond(), new Value(os.toByteArray())); + break; + default: + m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o))); count++; - } - break; - case RECORD: - SpecificDatumWriter writer = new SpecificDatumWriter(field.schema()); - ByteArrayOutputStream os = new ByteArrayOutputStream(); - BinaryEncoder encoder = new BinaryEncoder(os); - writer.write(o, encoder); - encoder.flush(); - m.put(col.getFirst(), col.getSecond(), new Value(os.toByteArray())); - break; - default: - m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o))); - count++; + } + } - + + if (count > 0) + try { + getBatchWriter().addMutation(m); + } catch (MutationsRejectedException e) { + LOG.error(e.getMessage()); + LOG.error(e.getStackTrace().toString()); + } + } catch (IOException e) { + LOG.error(e.getMessage()); + LOG.error(e.getStackTrace().toString()); } - - if (count > 0) - try { - getBatchWriter().addMutation(m); - } catch (MutationsRejectedException e) { - throw new IOException(e); - } } @Override - public boolean delete(K key) throws IOException { + public boolean delete(K key) { Query q = newQuery(); q.setKey(key); return deleteByQuery(q) > 0; } @Override - public long deleteByQuery(Query query) throws IOException { + public long deleteByQuery(Query query) { try { Scanner scanner = createScanner(query); // add iterator that drops values on the server side @@ -613,9 +640,17 @@ public class AccumuloStore execute(Query query) throws IOException { + public Result execute(Query query) { try { Scanner scanner = createScanner(query); return new AccumuloResult(this, query, scanner); } catch (TableNotFoundException e) { // TODO return empty result? - throw new IOException(e); - } + LOG.error(e.getMessage()); + LOG.error(e.getStackTrace().toString()); + return null; + } } @Override @@ -817,26 +854,27 @@ public class AccumuloStore keyClass, Class persistent, Properties properties) throws IOException { - super.initialize(keyClass, persistent, properties); + public void initialize(Class keyClass, Class persistent, Properties properties) { try { + super.initialize(keyClass, persistent, properties); this.cassandraClient.initialize(keyClass, persistent); - } - catch (Exception e) { - throw new IOException(e.getMessage(), e); + } catch (Exception e) { + LOG.error(e.getMessage()); + LOG.error(e.getStackTrace().toString()); } } @Override - public void close() throws IOException { + public void close() { LOG.debug("close"); flush(); } @@ -101,25 +101,25 @@ public class CassandraStore query) throws IOException { + public long deleteByQuery(Query query) { LOG.debug("delete by query " + query); return 0; } @Override - public void deleteSchema() throws IOException { + public void deleteSchema() { LOG.debug("delete schema"); this.cassandraClient.dropKeyspace(); } @Override - public Result execute(Query query) throws IOException { + public Result execute(Query query) { Map> familyMap = this.cassandraClient.getFamilyMap(query); Map reverseMap = this.cassandraClient.getReverseMap(query); @@ -207,7 +207,7 @@ public class CassandraStore keys = this.buffer.keySet(); @@ -236,14 +236,20 @@ public class CassandraStore query = new CassandraQuery(); query.setDataStore(this); query.setKeyRange(key, key); query.setFields(fields); query.setLimit(1); Result result = execute(query); - boolean hasResult = result.next(); + boolean hasResult = false; + try { + hasResult = result.next(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } return hasResult ? result.get() : null; } @@ -262,7 +268,7 @@ public class CassandraStore keyClass, Class persistentClass, - Properties properties) throws IOException { - super.initialize(keyClass, persistentClass, properties); - - if(properties != null) { - if(this.codecType == null) { - String codecType = DataStoreFactory.findProperty( - properties, this, CODEC_TYPE_KEY, "BINARY"); - this.codecType = CodecType.valueOf(codecType); + Properties properties) { + super.initialize(keyClass, persistentClass, properties); + + if(properties != null) { + if(this.codecType == null) { + String codecType = DataStoreFactory.findProperty( + properties, this, CODEC_TYPE_KEY, "BINARY"); + this.codecType = CodecType.valueOf(codecType); + } } - } } public void setCodecType(CodecType codecType) { @@ -109,22 +114,27 @@ public class AvroStore query) throws IOException { + public long deleteByQuery(Query query) { throw new OperationNotSupportedException("delete is not supported for AvroStore"); } @@ -148,14 +158,19 @@ public class AvroStore extends AvroStore { + public static final Logger LOG = LoggerFactory.getLogger(AvroStore.class); + public DataFileAvroStore() { } private DataFileWriter writer; @Override - public T get(K key, String[] fields) throws java.io.IOException { + public T get(K key, String[] fields) { throw new OperationNotSupportedException( "Avro DataFile's does not support indexed retrieval"); }; @Override - public void put(K key, T obj) throws java.io.IOException { - getWriter().append(obj); + public void put(K key, T obj) { + try{ + getWriter().append(obj); + } catch(IOException ex){ + LOG.error(ex.getMessage()); + LOG.error(ex.getStackTrace().toString()); + } }; private DataFileWriter getWriter() throws IOException { @@ -64,18 +74,29 @@ public class DataFileAvroStore executeQuery(Query query) throws IOException { - return new DataFileAvroResult(this, query - , createReader(createFsInput())); + protected Result executeQuery(Query query) { + try{ + return new DataFileAvroResult(this, query + , createReader(createFsInput())); + } catch(IOException ex){ + LOG.error(ex.getMessage()); + LOG.error(ex.getStackTrace().toString()); + return null; + } } @Override - protected Result executePartial(FileSplitPartitionQuery query) - throws IOException { - FsInput fsInput = createFsInput(); - DataFileReader reader = createReader(fsInput); - return new DataFileAvroResult(this, query, reader, fsInput - , query.getStart(), query.getLength()); + protected Result executePartial(FileSplitPartitionQuery query) { + try{ + FsInput fsInput = createFsInput(); + DataFileReader reader = createReader(fsInput); + return new DataFileAvroResult(this, query, reader, fsInput + , query.getStart(), query.getLength()); + } catch(IOException ex){ + LOG.error(ex.getMessage()); + LOG.error(ex.getStackTrace().toString()); + return null; + } } private DataFileReader createReader(FsInput fsInput) throws IOException { @@ -88,19 +109,29 @@ public class DataFileAvroStore query) throws IOException { + public long deleteByQuery(Query query) { try{ long deletedRows = 0; Result result = query.execute(); @@ -114,7 +115,7 @@ public class MemStore execute(Query query) throws IOException { + public Result execute(Query query) { K startKey = query.getStartKey(); K endKey = query.getEndKey(); if(startKey == null) { @@ -133,7 +134,7 @@ public class MemStoreNote: Results of updates ({@link #put(Object, Persistent)}, * {@link #delete(Object)} and {@link #deleteByQuery(Query)} operations) are * guaranteed to be visible to subsequent get / execute operations ONLY - * after a subsequent call to {@link #flush()}. + * after a subsequent call to {@link #flush()}. Additionally, exception + * handling is largely DataStore specific and is not largely dealt + * with from within this interface. * @param the class of keys in the datastore * @param the class of persistent objects in the datastore */ @@ -47,10 +49,10 @@ public interface DataStore { * @param keyClass the class of the keys * @param persistentClass the class of the persistent objects * @param properties extra metadata - * @throws Exception + * @throws IOException */ void initialize(Class keyClass, Class persistentClass, - Properties properties) throws Exception; + Properties properties); /** * Sets the class of the keys @@ -87,27 +89,31 @@ public interface DataStore { * to hold the objects. If the schema is already created previously, * or the underlying data model does not support * or need this operation, the operation is ignored. + * @throws IOException */ - void createSchema() throws Exception; + void createSchema(); /** * Deletes the underlying schema or table (or similar) in the datastore * that holds the objects. This also deletes all the data associated with * the schema. + * @throws IOException */ - void deleteSchema() throws Exception; + void deleteSchema(); /** * Deletes all the data associated with the schema, but keeps the * schema (table or similar) intact. + * @throws IOException */ - void truncateSchema() throws Exception; + void truncateSchema(); /** * Returns whether the schema that holds the data exists in the datastore. * @return whether schema exists + * @throws IOException */ - boolean schemaExists() throws Exception; + boolean schemaExists(); /** * Returns a new instance of the key object. If the object cannot be instantiated @@ -115,59 +121,67 @@ public interface DataStore { * constructor) it throws an exception. Only use this function if you can * make sure that the key class has a no-arg constructor. * @return a new instance of the key object. + * @throws IOException */ - K newKey() throws Exception; + K newKey(); /** * Returns a new instance of the managed persistent object. * @return a new instance of the managed persistent object. + * @throws IOException */ - T newPersistent() throws Exception; + T newPersistent(); /** * Returns the object corresponding to the given key fetching all the fields. * @param key the key of the object * @return the Object corresponding to the key or null if it cannot be found + * @throws IOException */ - T get(K key) throws Exception; + T get(K key); /** * Returns the object corresponding to the given key. * @param key the key of the object * @param fields the fields required in the object. Pass null, to retrieve all fields * @return the Object corresponding to the key or null if it cannot be found + * @throws IOException */ - T get(K key, String[] fields) throws Exception; + T get(K key, String[] fields); /** * Inserts the persistent object with the given key. If an * object with the same key already exists it will silently * be replaced. See also the note on * visibility. + * @throws IOException */ - void put(K key, T obj) throws Exception; + void put(K key, T obj); /** * Deletes the object with the given key * @param key the key of the object * @return whether the object was successfully deleted + * @throws IOException */ - boolean delete(K key) throws Exception; + boolean delete(K key); /** * Deletes all the objects matching the query. * See also the note on visibility. * @param query matching records to this query will be deleted * @return number of deleted records + * @throws IOException */ - long deleteByQuery(Query query) throws Exception; + long deleteByQuery(Query query); /** * Executes the given query and returns the results. * @param query the query to execute. * @return the results as a {@link Result} object. + * @throws IOException */ - Result execute(Query query) throws Exception; + Result execute(Query query); /** * Constructs and returns a new Query. @@ -191,8 +205,9 @@ public interface DataStore { * optimize their writing by deferring the actual put / delete operations * until this moment. * See also the note on visibility. + * @throws IOException */ - void flush() throws Exception; + void flush(); /** * Sets the {@link BeanFactory} to use by the DataStore. @@ -211,11 +226,8 @@ public interface DataStore { * implementation, so that the instance is ready for GC. * All other DataStore methods cannot be used after this * method was called. Subsequent calls of this method are ignored. + * @throws IOException */ - void close() throws IOException, InterruptedException, Exception; - - //void readFields(Object in) throws Exception; - - //void write() throws Exception; + void close(); } Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java?rev=1403993&r1=1403992&r2=1403993&view=diff ============================================================================== --- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java (original) +++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/DataStoreFactory.java Wed Oct 31 05:53:43 2012 @@ -98,7 +98,7 @@ public class DataStoreFactory{ private static void initializeDataStore( DataStore dataStore, Class keyClass, Class persistent, - Properties properties) throws IOException, Exception { + Properties properties) throws IOException { dataStore.initialize(keyClass, persistent, properties); } Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java?rev=1403993&r1=1403992&r2=1403993&view=diff ============================================================================== --- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java (original) +++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java Wed Oct 31 05:53:43 2012 @@ -30,6 +30,7 @@ import org.apache.avro.Schema.Field; import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.gora.avro.PersistentDatumReader; import org.apache.gora.avro.PersistentDatumWriter; +import org.apache.gora.avro.store.AvroStore; import org.apache.gora.persistency.BeanFactory; import org.apache.gora.persistency.impl.BeanFactoryImpl; import org.apache.gora.persistency.impl.PersistentBase; @@ -41,9 +42,10 @@ import org.apache.gora.util.StringUtils; import org.apache.gora.util.WritableUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A Base class for Avro persistent {@link DataStore}s. @@ -71,25 +73,27 @@ implements DataStore, Configurable protected PersistentDatumReader datumReader; protected PersistentDatumWriter datumWriter; + + public static final Logger LOG = LoggerFactory.getLogger(AvroStore.class); public DataStoreBase() { } @Override public void initialize(Class keyClass, Class persistentClass, - Properties properties) throws IOException { - setKeyClass(keyClass); - setPersistentClass(persistentClass); - if(this.beanFactory == null) - this.beanFactory = new BeanFactoryImpl(keyClass, persistentClass); - schema = this.beanFactory.getCachedPersistent().getSchema(); - fieldMap = AvroUtils.getFieldMap(schema); - - autoCreateSchema = DataStoreFactory.getAutoCreateSchema(properties, this); - this.properties = properties; - - datumReader = new PersistentDatumReader(schema, false); - datumWriter = new PersistentDatumWriter(schema, false); + Properties properties) { + setKeyClass(keyClass); + setPersistentClass(persistentClass); + if(this.beanFactory == null) + this.beanFactory = new BeanFactoryImpl(keyClass, persistentClass); + schema = this.beanFactory.getCachedPersistent().getSchema(); + fieldMap = AvroUtils.getFieldMap(schema); + + autoCreateSchema = DataStoreFactory.getAutoCreateSchema(properties, this); + this.properties = properties; + + datumReader = new PersistentDatumReader(schema, false); + datumWriter = new PersistentDatumWriter(schema, false); } @Override @@ -114,20 +118,24 @@ implements DataStore, Configurable } @Override - public K newKey() throws IOException { + public K newKey() { try { return beanFactory.newKey(); } catch (Exception ex) { - throw new IOException(ex); + LOG.error(ex.getMessage()); + LOG.error(ex.getStackTrace().toString()); + return null; } } @Override - public T newPersistent() throws IOException { + public T newPersistent() { try { return beanFactory.newPersistent(); } catch (Exception ex) { - throw new IOException(ex); + LOG.error(ex.getMessage()); + LOG.error(ex.getStackTrace().toString()); + return null; } } @@ -142,7 +150,7 @@ implements DataStore, Configurable } @Override - public T get(K key) throws IOException, Exception { + public T get(K key) { return get(key, getFieldsToQuery(null)); }; @@ -176,21 +184,30 @@ implements DataStore, Configurable } @SuppressWarnings("unchecked") - public void readFields(DataInput in) throws IOException { + public void readFields(DataInput in) { try { Class keyClass = (Class) ClassLoadingUtils.loadClass(Text.readString(in)); Class persistentClass = (Class)ClassLoadingUtils.loadClass(Text.readString(in)); Properties props = WritableUtils.readProperties(in); initialize(keyClass, persistentClass, props); } catch (ClassNotFoundException ex) { - throw new IOException(ex); + LOG.error(ex.getMessage()); + LOG.error(ex.getStackTrace().toString()); + } catch (IOException e) { + LOG.error(e.getMessage()); + LOG.error(e.getStackTrace().toString()); } } - public void write(DataOutput out) throws IOException { - Text.writeString(out, getKeyClass().getCanonicalName()); - Text.writeString(out, getPersistentClass().getCanonicalName()); - WritableUtils.writeProperties(out, properties); + public void write(DataOutput out) { + try { + Text.writeString(out, getKeyClass().getCanonicalName()); + Text.writeString(out, getPersistentClass().getCanonicalName()); + WritableUtils.writeProperties(out, properties); + } catch (IOException e) { + LOG.error(e.getMessage()); + LOG.error(e.getStackTrace().toString()); + } } @Override @@ -208,7 +225,7 @@ implements DataStore, Configurable @Override /** Default implementation deletes and recreates the schema*/ - public void truncateSchema() throws IOException, Exception { + public void truncateSchema() { deleteSchema(); createSchema(); } Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java?rev=1403993&r1=1403992&r2=1403993&view=diff ============================================================================== --- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java (original) +++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java Wed Oct 31 05:53:43 2012 @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; +import org.apache.gora.avro.store.AvroStore; import org.apache.gora.mapreduce.GoraMapReduceUtils; import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.PartitionQuery; @@ -42,6 +43,8 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Base implementations for {@link FileBackedDataStore} methods. @@ -56,10 +59,12 @@ public abstract class FileBackedDataStor protected InputStream inputStream; protected OutputStream outputStream; + + public static final Logger LOG = LoggerFactory.getLogger(AvroStore.class); @Override public void initialize(Class keyClass, Class persistentClass, - Properties properties) throws IOException { + Properties properties) { super.initialize(keyClass, persistentClass, properties); if(properties != null) { if(this.inputPath == null) { @@ -122,17 +127,28 @@ public InputStream getInputStream() { } /** Opens an OutputStream for the output Hadoop path */ - protected OutputStream createOutputStream() throws IOException { - Path path = new Path(outputPath); - FileSystem fs = path.getFileSystem(getConf()); - return fs.create(path); + protected OutputStream createOutputStream() { + OutputStream conf = null; + try{ + Path path = new Path(outputPath); + FileSystem fs = path.getFileSystem(getConf()); + conf = fs.create(path); + }catch(IOException ex){ + LOG.error(ex.getMessage()); + LOG.error(ex.getStackTrace().toString()); + } + return conf; } protected InputStream getOrCreateInputStream() throws IOException { - if(inputStream == null) { - inputStream = createInputStream(); + try{ + if(inputStream == null) { + inputStream = createInputStream(); + } + return inputStream; + }catch(IOException ex){ + throw new IOException(ex); } - return inputStream; } protected OutputStream getOrCreateOutputStream() throws IOException { @@ -143,25 +159,37 @@ public InputStream getInputStream() { } @Override - public List> getPartitions(Query query) - throws IOException { - List splits = GoraMapReduceUtils.getSplits(getConf(), inputPath); - List> queries = new ArrayList>(splits.size()); - - for(InputSplit split : splits) { - queries.add(new FileSplitPartitionQuery(query, (FileSplit) split)); + public List> getPartitions(Query query){ + List splits = null; + List> queries = null; + try{ + splits = GoraMapReduceUtils.getSplits(getConf(), inputPath); + queries = new ArrayList>(splits.size()); + + for(InputSplit split : splits) { + queries.add(new FileSplitPartitionQuery(query, (FileSplit) split)); + } + }catch(IOException ex){ + LOG.error(ex.getMessage()); + LOG.error(ex.getStackTrace().toString()); } - return queries; } @Override - public Result execute(Query query) throws IOException { - if(query instanceof FileSplitPartitionQuery) { - return executePartial((FileSplitPartitionQuery) query); - } else { - return executeQuery(query); + public Result execute(Query query) { + Result results = null; + try{ + if(query instanceof FileSplitPartitionQuery) { + results = executePartial((FileSplitPartitionQuery) query); + } else { + results = executeQuery(query); + } + }catch(IOException ex){ + LOG.error(ex.getMessage()); + LOG.error(ex.getStackTrace().toString()); } + return results; } /** @@ -178,51 +206,66 @@ public InputStream getInputStream() { throws IOException; @Override - public void flush() throws IOException { - if(outputStream != null) - outputStream.flush(); + public void flush() { + try{ + if(outputStream != null) + outputStream.flush(); + }catch(IOException ex){ + LOG.error(ex.getMessage()); + LOG.error(ex.getStackTrace().toString()); + } } @Override - public void createSchema() throws IOException { + public void createSchema() { } @Override - public void deleteSchema() throws IOException { + public void deleteSchema() { throw new OperationNotSupportedException("delete schema is not supported for " + "file backed data stores"); } @Override - public boolean schemaExists() throws IOException { + public boolean schemaExists() { return true; } @Override - public void write(DataOutput out) throws IOException { - super.write(out); - org.apache.gora.util.IOUtils.writeNullFieldsInfo(out, inputPath, outputPath); - if(inputPath != null) - Text.writeString(out, inputPath); - if(outputPath != null) - Text.writeString(out, outputPath); + public void write(DataOutput out) { + try{ + super.write(out); + org.apache.gora.util.IOUtils.writeNullFieldsInfo(out, inputPath, outputPath); + if(inputPath != null) + Text.writeString(out, inputPath); + if(outputPath != null) + Text.writeString(out, outputPath); + }catch(IOException ex){ + LOG.error(ex.getMessage()); + LOG.error(ex.getStackTrace().toString()); + } } @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - boolean[] nullFields = org.apache.gora.util.IOUtils.readNullFieldsInfo(in); - if(!nullFields[0]) - inputPath = Text.readString(in); - if(!nullFields[1]) - outputPath = Text.readString(in); + public void readFields(DataInput in) { + try{ + super.readFields(in); + boolean[] nullFields = org.apache.gora.util.IOUtils.readNullFieldsInfo(in); + if(!nullFields[0]) + inputPath = Text.readString(in); + if(!nullFields[1]) + outputPath = Text.readString(in); + }catch(IOException ex){ + LOG.error(ex.getMessage()); + LOG.error(ex.getStackTrace().toString()); + } } @Override - public void close() throws IOException { - IOUtils.closeStream(inputStream); - IOUtils.closeStream(outputStream); - inputStream = null; - outputStream = null; + public void close() { + IOUtils.closeStream(inputStream); + IOUtils.closeStream(outputStream); + inputStream = null; + outputStream = null; } } Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSBackedDataStoreBase.java URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSBackedDataStoreBase.java?rev=1403993&r1=1403992&r2=1403993&view=diff ============================================================================== --- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSBackedDataStoreBase.java (original) +++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSBackedDataStoreBase.java Wed Oct 31 05:53:43 2012 @@ -38,9 +38,10 @@ public abstract class WSBackedDataStoreB @Override /** * Initializes a web service backed data store + * @throws IOException */ public void initialize(Class keyClass, Class persistentClass, - Properties properties) throws Exception { + Properties properties) { super.initialize(keyClass, persistentClass, properties); } @@ -48,9 +49,13 @@ public abstract class WSBackedDataStoreB /** * Executes a query inside a web service backed data store */ - public Result execute(Query query) throws Exception { - // TODO We could have different types of execution {@link FileBackedDataStoreBase} - return executeQuery(query); + public Result execute(Query query) { + try { + return executeQuery(query); + } catch (IOException e) { + e.printStackTrace(); + return null; + } } /** @@ -58,27 +63,27 @@ public abstract class WSBackedDataStoreB * for non-PartitionQuery's. */ protected abstract Result executeQuery(Query query) - throws Exception; + throws IOException; @Override /** * Flushes objects into the data store */ - public void flush() throws Exception { + public void flush() { } @Override /** * Creates schema into the data store */ - public void createSchema() throws Exception{ + public void createSchema() { } @Override /** * Deletes schema from the data store */ - public void deleteSchema() throws Exception { + public void deleteSchema() { throw new OperationNotSupportedException("delete schema is not supported for " + "file backed data stores"); } @@ -87,7 +92,7 @@ public abstract class WSBackedDataStoreB /** * Verifies if a schema exists */ - public boolean schemaExists() throws Exception { + public boolean schemaExists() { return true; } @@ -111,6 +116,6 @@ public abstract class WSBackedDataStoreB /** * Closes the data store */ - public void close() throws IOException, InterruptedException, Exception { + public void close() { } } Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreBase.java URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreBase.java?rev=1403993&r1=1403992&r2=1403993&view=diff ============================================================================== --- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreBase.java (original) +++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreBase.java Wed Oct 31 05:53:43 2012 @@ -20,6 +20,7 @@ */ package org.apache.gora.store.ws.impl; +import java.io.IOException; import java.util.Properties; import org.apache.gora.persistency.Persistent; @@ -68,7 +69,7 @@ implements DataStore{ * Initializes the web services backed data store */ public void initialize(Class keyClass, Class persistentClass, - Properties properties) throws Exception { + Properties properties) { setKeyClass(keyClass); setPersistentClass(persistentClass); } @@ -149,7 +150,7 @@ implements DataStore{ @Override /** Default implementation deletes and recreates the schema*/ - public void truncateSchema() throws Exception { + public void truncateSchema() { deleteSchema(); createSchema(); } Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreFactory.java URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreFactory.java?rev=1403993&r1=1403992&r2=1403993&view=diff ============================================================================== --- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreFactory.java (original) +++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/store/ws/impl/WSDataStoreFactory.java Wed Oct 31 05:53:43 2012 @@ -145,6 +145,7 @@ public class WSDataStoreFactory{ * @return A new store instance. * @throws GoraException */ + @SuppressWarnings("unchecked") public static , K, T extends Persistent> D createDataStore(Class dataStoreClass, Class keyClass , Class persistent, Object auth, Properties properties, String schemaName) Modified: gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/util/IOUtils.java URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/util/IOUtils.java?rev=1403993&r1=1403992&r2=1403993&view=diff ============================================================================== --- gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/util/IOUtils.java (original) +++ gora/branches/goraamazon/gora-core/src/main/java/org/apache/gora/util/IOUtils.java Wed Oct 31 05:53:43 2012 @@ -42,7 +42,6 @@ import org.apache.avro.ipc.ByteBufferInp import org.apache.avro.ipc.ByteBufferOutputStream; import org.apache.gora.avro.PersistentDatumReader; import org.apache.gora.avro.PersistentDatumWriter; -import org.apache.gora.persistency.Persistent; import org.apache.gora.persistency.impl.PersistentBase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputBuffer; Modified: gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java?rev=1403993&r1=1403992&r2=1403993&view=diff ============================================================================== --- gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java (original) +++ gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java Wed Oct 31 05:53:43 2012 @@ -71,11 +71,8 @@ public class GoraTestDriver { */ public void setUp() throws Exception { log.info("setting up test"); - try { - for(DataStore store : dataStores) { - store.truncateSchema(); - } - }catch (IOException ignore) { + for(DataStore store : dataStores) { + store.truncateSchema(); } } Modified: gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/mock/store/MockDataStore.java URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/mock/store/MockDataStore.java?rev=1403993&r1=1403992&r2=1403993&view=diff ============================================================================== --- gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/mock/store/MockDataStore.java (original) +++ gora/branches/goraamazon/gora-core/src/test/java/org/apache/gora/mock/store/MockDataStore.java Wed Oct 31 05:53:43 2012 @@ -57,49 +57,47 @@ public class MockDataStore extends DataS } @Override - public void close() throws IOException { + public void close() { } @Override - public void createSchema() throws IOException { + public void createSchema() { } @Override - public void deleteSchema() throws IOException { + public void deleteSchema() { } @Override - public void truncateSchema() throws IOException { + public void truncateSchema() { } @Override - public boolean schemaExists() throws IOException { + public boolean schemaExists() { return true; } @Override - public boolean delete(String key) throws IOException { + public boolean delete(String key) { return false; } @Override - public long deleteByQuery(Query query) - throws IOException { + public long deleteByQuery(Query query) { return 0; } @Override - public Result execute( - Query query) throws IOException { + public Result execute(Query query) { return null; } @Override - public void flush() throws IOException { + public void flush() { } @Override - public MockPersistent get(String key, String[] fields) throws IOException { + public MockPersistent get(String key, String[] fields) { return null; } @@ -133,7 +131,7 @@ public class MockDataStore extends DataS } @Override - public void put(String key, MockPersistent obj) throws IOException { + public void put(String key, MockPersistent obj) { } @Override Modified: gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java URL: http://svn.apache.org/viewvc/gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java?rev=1403993&r1=1403992&r2=1403993&view=diff ============================================================================== --- gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java (original) +++ gora/branches/goraamazon/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java Wed Oct 31 05:53:43 2012 @@ -139,8 +139,8 @@ public class DynamoDBQuery keyClass, Class pPersistentClass, - Properties properties) throws Exception { + Properties properties) { try { LOG.debug("Initializing DynamoDB store"); getCredentials(); @@ -172,7 +172,8 @@ public class DynamoDBStore tableElements = root.getChildren("table"); for(Element tableElement : tableElements) { - + String tableName = tableElement.getAttributeValue("name"); long readCapacUnits = Long.parseLong(tableElement.getAttributeValue("readcunit")); long writeCapacUnits = Long.parseLong(tableElement.getAttributeValue("readcunit")); @@ -267,8 +268,8 @@ public class DynamoDBStore execute(Query query) throws Exception { + public Result execute(Query query) { DynamoDBQuery dynamoDBQuery = buildDynamoDBQuery(query); DynamoDBMapper mapper = new DynamoDBMapper(dynamoDBClient); List objList = null; @@ -319,7 +320,7 @@ public class DynamoDBStore query = new DynamoDBQuery(); query.setDataStore(this); //query.setKeyRange(key, key); @@ -331,22 +332,36 @@ public class DynamoDBStore query) throws Exception { + public long deleteByQuery(Query query) { // TODO verify whether or not we are deleting a whole row //String[] fields = getFieldsToQuery(query.getFields()); //find whether all fields are queried, which means that complete @@ -646,15 +691,27 @@ public class DynamoDBStore result = execute(query); ArrayList deletes = new ArrayList(); - while(result.next()) { - T resultObj = result.get(); - deletes.add(resultObj); - - @SuppressWarnings("rawtypes") - DynamoDBKey dKey = new DynamoDBKey(); - dKey.setHashKey(getHashKey(resultObj)); - dKey.setRangeKey(getRangeKey(resultObj)); - delete((K)dKey); + try { + while(result.next()) { + T resultObj = result.get(); + deletes.add(resultObj); + + @SuppressWarnings("rawtypes") + DynamoDBKey dKey = new DynamoDBKey(); + + dKey.setHashKey(getHashKey(resultObj)); + + dKey.setRangeKey(getRangeKey(resultObj)); + delete((K)dKey); + } + } catch (IllegalArgumentException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } catch (InvocationTargetException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); } return deletes.size(); } @@ -743,8 +800,12 @@ public class DynamoDBStore