incubator-gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject svn commit: r1006024 [4/8] - in /incubator/gora: branches/ tags/ trunk/ trunk/bin/ trunk/conf/ trunk/docs/ trunk/gora-cassandra/ trunk/gora-cassandra/ivy/ trunk/gora-cassandra/lib-ext/ trunk/gora-cassandra/src/ trunk/gora-cassandra/src/examples/ trunk/...
Date Fri, 08 Oct 2010 21:17:17 GMT
Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/StateManager.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/StateManager.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/StateManager.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/StateManager.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,93 @@
+
+package org.gora.persistency;
+
+/**
+ * StateManager manages objects state for persistency.
+ */
+public interface StateManager {
+
+  /**
+   * If one state manager is allocated per persistent object, 
+   * call this method to set the managed persistent. 
+   * @param persistent the persistent to manage
+   */
+  public void setManagedPersistent(Persistent persistent);
+
+  /**
+   * Returns whether the object is newly constructed.
+   * @return true if the object is newly constructed, false if
+   * retrieved from a datastore. 
+   */
+  public boolean isNew(Persistent persistent);
+  
+  /**
+   * Sets the state of the object as new for persistency
+   */
+  public void setNew(Persistent persistent);
+  
+  /**
+   * Clears the new state 
+   */
+  public void clearNew(Persistent persistent);
+
+  /**
+   * Returns whether any of the fields of the object has been modified 
+   * after construction or loading. 
+   * @return whether any of the fields of the object has changed
+   */
+  public boolean isDirty(Persistent persistent);
+  
+  /**
+   * Returns whether the field has been modified.
+   * @param fieldIndex the offset of the field in the object
+   * @return whether the field has been modified.
+   */
+  public boolean isDirty(Persistent persistent, int fieldIndex);
+  
+  /**
+   * Sets all the fields of the object as dirty.
+   */
+  public void setDirty(Persistent persistent);
+  
+  /**
+   * Sets the field as dirty.
+   * @param fieldIndex the offset of the field in the object
+   */
+  public void setDirty(Persistent persistent, int fieldIndex);
+
+  /**
+   * Clears the field as dirty.
+   * @param fieldIndex the offset of the field in the object
+   */
+  public void clearDirty(Persistent persistent, int fieldIndex);
+  
+  /**
+   * Clears the dirty state.
+   */
+  public void clearDirty(Persistent persistent);
+  
+  /**
+   * Returns whether the field has been loaded from the datastore. 
+   * @param fieldIndex the offset of the field in the object
+   * @return whether the field has been loaded 
+   */
+  public boolean isReadable(Persistent persistent, int fieldIndex);
+  
+  /**
+   * Sets the field as readable.
+   * @param fieldIndex the offset of the field in the object
+   */
+  public void setReadable(Persistent persistent, int fieldIndex);
+
+  /**
+   * Clears the field as readable.
+   * @param fieldIndex the offset of the field in the object
+   */
+  public void clearReadable(Persistent persistent, int fieldIndex);
+  
+  /**
+   * Clears the readable state.
+   */
+  public void clearReadable(Persistent persistent);
+  
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/StatefulHashMap.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/StatefulHashMap.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/StatefulHashMap.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/StatefulHashMap.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,81 @@
+package org.gora.persistency;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@SuppressWarnings("serial")
+public class StatefulHashMap<K, V> extends HashMap<K, V> 
+  implements StatefulMap<K, V> {
+  
+  /* This is probably a terrible design but I do not yet have a better
+   * idea of managing write/delete info on a per-key basis
+   */
+  private Map<K, State> keyStates = new HashMap<K, State>();
+
+  public StatefulHashMap() {
+    this(null);
+  }
+
+  public StatefulHashMap(Map<K, V> m) {
+    super();
+    if (m == null) {
+      return;
+    }
+    super.putAll(m);
+  }
+  
+  @Override
+  public V put(K key, V value) {
+    keyStates.put(key, State.DIRTY);
+    return super.put(key, value);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public V remove(Object key) {
+    if (keyStates.containsKey(key)) {
+      keyStates.put((K) key, State.DELETED);
+    }
+    return super.remove(key);
+  }
+
+  @Override
+  public void putAll(Map<? extends K, ? extends V> m) {
+    for (Entry<? extends K, ? extends V> e : m.entrySet()) {
+      put(e.getKey(), e.getValue());
+    }
+  }
+
+  @Override
+  public void clear() {
+    for (Entry<K, V> e : entrySet()) {
+      keyStates.put(e.getKey(), State.DELETED);
+    }
+    super.clear();
+  }
+
+  public State getState(K key) {
+    return keyStates.get(key);
+  };
+  
+  /* (non-Javadoc)
+   * @see org.gora.persistency.StatefulMap#resetStates()
+   */
+  public void clearStates() {
+    keyStates.clear();
+  }
+
+  /* (non-Javadoc)
+   * @see org.gora.persistency.StatefulMap#putState(K, org.gora.persistency.State)
+   */
+  public void putState(K key, State state) {
+    keyStates.put(key, state);
+  }
+
+  /* (non-Javadoc)
+   * @see org.gora.persistency.StatefulMap#states()
+   */
+  public Map<K, State> states() {
+    return keyStates;
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/StatefulMap.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/StatefulMap.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/StatefulMap.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/StatefulMap.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,20 @@
+
+package org.gora.persistency;
+
+import java.util.Map;
+
+/**
+ * StatefulMap extends the Map interface to keep track of the 
+ * persistency states of individual elements in the Map.  
+ */
+public interface StatefulMap<K, V> extends Map<K, V> {
+
+  public abstract State getState(K key);
+  
+  public abstract void putState(K key, State state);
+
+  public abstract Map<K, State> states();
+
+  public abstract void clearStates();
+  
+}
\ No newline at end of file

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/impl/BeanFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/impl/BeanFactoryImpl.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/impl/BeanFactoryImpl.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/impl/BeanFactoryImpl.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,80 @@
+
+package org.gora.persistency.impl;
+
+import java.lang.reflect.Constructor;
+
+import org.gora.persistency.BeanFactory;
+import org.gora.persistency.Persistent;
+import org.gora.util.ReflectionUtils;
+
+/**
+ * A default implementation of the {@link BeanFactory} interface. Constructs 
+ * the keys using by reflection, {@link Persistent} objects by calling 
+ * {@link Persistent#newInstance(org.gora.persistency.StateManager)}. 
+ */
+public class BeanFactoryImpl<K, T extends Persistent> implements BeanFactory<K, T> {
+
+  private Class<K> keyClass;
+  private Class<T> persistentClass;
+  
+  private Constructor<K> keyConstructor;
+  
+  private K key;
+  private T persistent;
+  
+  private boolean isKeyPersistent = false;
+  
+  public BeanFactoryImpl(Class<K> keyClass, Class<T> persistentClass) {
+    this.keyClass = keyClass;
+    this.persistentClass = persistentClass;
+    
+    try {
+      this.keyConstructor = ReflectionUtils.getConstructor(keyClass);
+      this.key = keyConstructor.newInstance(ReflectionUtils.EMPTY_OBJECT_ARRAY);
+      this.persistent = ReflectionUtils.newInstance(persistentClass);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    
+    isKeyPersistent = Persistent.class.isAssignableFrom(keyClass);
+  }
+  
+  @Override
+  @SuppressWarnings("unchecked")
+  public K newKey() throws Exception {
+    if(isKeyPersistent)
+      return (K)((Persistent)key).newInstance(new StateManagerImpl());
+    else
+      return keyConstructor.newInstance(ReflectionUtils.EMPTY_OBJECT_ARRAY);
+  }
+ 
+  @SuppressWarnings("unchecked")
+  @Override
+  public T newPersistent() {
+    return (T) persistent.newInstance(new StateManagerImpl());
+  }
+  
+  @Override
+  public K getCachedKey() {
+    return key;
+  }
+  
+  @Override
+  public T getCachedPersistent() {
+    return persistent;
+  }
+  
+  @Override
+  public Class<K> getKeyClass() {
+    return keyClass;
+  }
+  
+  @Override
+  public Class<T> getPersistentClass() {
+    return persistentClass;
+  }
+  
+  public boolean isKeyPersistent() {
+    return isKeyPersistent;
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/impl/PersistentBase.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/impl/PersistentBase.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/impl/PersistentBase.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/impl/PersistentBase.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,279 @@
+package org.gora.persistency.impl;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.specific.SpecificRecord;
+import org.gora.avro.PersistentDatumReader;
+import org.gora.persistency.ListGenericArray;
+import org.gora.persistency.Persistent;
+import org.gora.persistency.StateManager;
+
+/**
+ * Base classs implementing common functionality for Persistent
+ * classes.
+ */
+public abstract class PersistentBase implements Persistent {
+
+  protected static Map<Class<?>, Map<String, Integer>> FIELD_MAP =
+    new HashMap<Class<?>, Map<String,Integer>>();
+
+  protected static Map<Class<?>, String[]> FIELDS =
+    new HashMap<Class<?>, String[]>();
+
+  protected static final PersistentDatumReader<Persistent> datumReader =
+    new PersistentDatumReader<Persistent>();
+    
+  private StateManager stateManager;
+
+  protected PersistentBase() {
+    this(new StateManagerImpl());
+  }
+
+  protected PersistentBase(StateManager stateManager) {
+    this.stateManager = stateManager;
+    stateManager.setManagedPersistent(this);
+  }
+
+  /** Subclasses should call this function for all the persistable fields
+   * in the class to register them.
+   * @param field the name of the field
+   * @param index the index of the field
+   */
+  protected static void registerFields(Class<?> clazz, String... fields) {
+    FIELDS.put(clazz, fields);
+    int fieldsLength = fields == null ? 0 :fields.length;
+    HashMap<String, Integer> map = new HashMap<String, Integer>(fieldsLength);
+
+    for(int i=0; i < fieldsLength; i++) {
+      map.put(fields[i], i);
+    }
+    FIELD_MAP.put(clazz, map);
+  }
+
+  @Override
+  public StateManager getStateManager() {
+    return stateManager;
+  }
+
+  @Override
+  public String[] getFields() {
+    return FIELDS.get(getClass());
+  }
+
+  @Override
+  public String getField(int index) {
+    return FIELDS.get(getClass())[index];
+  }
+
+  @Override
+  public int getFieldIndex(String field) {
+    return FIELD_MAP.get(getClass()).get(field);
+  }
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public void clear() {
+    List<Field> fields = getSchema().getFields();
+
+    for(int i=0; i<getFields().length; i++) {
+      switch(fields.get(i).schema().getType()) {
+        case MAP: if(get(i) != null) ((Map)get(i)).clear(); break;
+        case ARRAY:
+          if(get(i) != null) {
+            if(get(i) instanceof ListGenericArray) {
+              ((ListGenericArray)get(i)).clear();
+            } else {
+              put(i, new ListGenericArray(fields.get(i).schema()));
+            }
+          }
+          break;
+        case RECORD :
+          Persistent field = ((Persistent)get(i));
+          if(field != null) field.clear();
+          break;
+        case BOOLEAN: put(i, false); break;
+        case INT    : put(i, 0); break;
+        case DOUBLE : put(i, 0d); break;
+        case FLOAT  : put(i, 0f); break;
+        case LONG   : put(i, 0l); break;
+        case NULL   : break;
+        default     : put(i, null); break;
+      }
+    }
+    clearDirty();
+    clearReadable();
+  }
+
+  @Override
+  public boolean isNew() {
+    return getStateManager().isNew(this);
+  }
+
+  @Override
+  public void setNew() {
+    getStateManager().setNew(this);
+  }
+
+  @Override
+  public void clearNew() {
+    getStateManager().clearNew(this);
+  }
+
+  @Override
+  public boolean isDirty() {
+    return getStateManager().isDirty(this);
+  }
+
+  @Override
+  public boolean isDirty(int fieldIndex) {
+    return getStateManager().isDirty(this, fieldIndex);
+  }
+
+  @Override
+  public boolean isDirty(String field) {
+    return isDirty(getFieldIndex(field));
+  }
+
+  @Override
+  public void setDirty() {
+    getStateManager().setDirty(this);
+  }
+
+  @Override
+  public void setDirty(int fieldIndex) {
+    getStateManager().setDirty(this, fieldIndex);
+  }
+
+  @Override
+  public void setDirty(String field) {
+    setDirty(getFieldIndex(field));
+  }
+
+  @Override
+  public void clearDirty(int fieldIndex) {
+    getStateManager().clearDirty(this, fieldIndex);
+  }
+
+  @Override
+  public void clearDirty(String field) {
+    clearDirty(getFieldIndex(field));
+  }
+
+  @Override
+  public void clearDirty() {
+    getStateManager().clearDirty(this);
+  }
+
+  @Override
+  public boolean isReadable(int fieldIndex) {
+    return getStateManager().isReadable(this, fieldIndex);
+  }
+
+  @Override
+  public boolean isReadable(String field) {
+    return isReadable(getFieldIndex(field));
+  }
+
+  @Override
+  public void setReadable(int fieldIndex) {
+    getStateManager().setReadable(this, fieldIndex);
+  }
+
+  @Override
+  public void setReadable(String field) {
+    setReadable(getFieldIndex(field));
+  }
+
+  @Override
+  public void clearReadable() {
+    getStateManager().clearReadable(this);
+  }
+
+  @Override
+  public void clearReadable(int fieldIndex) {
+    getStateManager().clearReadable(this, fieldIndex);
+  }
+
+  @Override
+  public void clearReadable(String field) {
+    clearReadable(getFieldIndex(field));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof SpecificRecord)) return false;
+
+    SpecificRecord r2 = (SpecificRecord)o;
+    if (!this.getSchema().equals(r2.getSchema())) return false;
+
+    return this.hashCode() == r2.hashCode();
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    List<Field> fields = this.getSchema().getFields();
+    int end = fields.size();
+    for (int i = 0; i < end; i++) {
+      result = prime * result + getFieldHashCode(i, fields.get(i));
+    }
+    return result;
+  }
+
+  private int getFieldHashCode(int i, Field field) {
+    Object o = get(i);
+    if(o == null)
+      return 0;
+
+    if(field.schema().getType() == Type.BYTES) {
+      return getByteBufferHashCode((ByteBuffer)o);
+    }
+
+    return o.hashCode();
+  }
+
+  /** ByteBuffer.hashCode() takes into account the position of the
+   * buffer, but we do not want that*/
+  private int getByteBufferHashCode(ByteBuffer buf) {
+    int h = 1;
+    int p = buf.arrayOffset();
+    for (int j = buf.limit() - 1; j >= p; j--)
+          h = 31 * h + buf.get(j);
+    return h;
+  }
+  
+  @Override
+  public Persistent clone() {
+    return datumReader.clone(this, getSchema());
+  }
+  
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append(super.toString());
+    builder.append(" {\n");
+    List<Field> fields = getSchema().getFields();
+    for(int i=0; i<fields.size(); i++) {
+      builder.append("  \"").append(fields.get(i).name()).append("\":\"");
+      builder.append(get(i)).append("\"\n");
+    }
+    builder.append("}");
+    return builder.toString();
+  }
+  
+  protected boolean isFieldEqual(int index, Object value) {
+    Object old = get(index);
+    if (old == null && value == null)
+      return true;
+    if (old == null || value == null)
+      return false;
+    return value.equals(old);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/impl/StateManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/impl/StateManagerImpl.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/impl/StateManagerImpl.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/persistency/impl/StateManagerImpl.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,87 @@
+
+package org.gora.persistency.impl;
+
+import java.util.BitSet;
+
+import org.gora.persistency.Persistent;
+import org.gora.persistency.StateManager;
+
+/**
+ * An implementation for the StateManager. This implementation assumes 
+ * every Persistent object has it's own StateManager.
+ */
+public class StateManagerImpl implements StateManager {
+
+  //TODO: serialize isNew in PersistentSerializer 
+  protected boolean isNew;
+  protected BitSet dirtyBits;
+  protected BitSet readableBits;
+
+  public StateManagerImpl() {
+  }
+
+  public void setManagedPersistent(Persistent persistent) {
+    dirtyBits = new BitSet(persistent.getSchema().getFields().size());
+    readableBits = new BitSet(persistent.getSchema().getFields().size());
+    isNew = true;
+  }
+
+  @Override
+  public boolean isNew(Persistent persistent) {
+    return isNew;
+  }
+  
+  @Override
+  public void setNew(Persistent persistent) {
+    this.isNew = true;
+  }
+  
+  @Override
+  public void clearNew(Persistent persistent) {
+    this.isNew = false;
+  }
+  
+  public void setDirty(Persistent persistent, int fieldIndex) {
+    dirtyBits.set(fieldIndex);
+    readableBits.set(fieldIndex);
+  }
+  
+  public boolean isDirty(Persistent persistent, int fieldIndex) {
+    return dirtyBits.get(fieldIndex);
+  }
+
+  public boolean isDirty(Persistent persistent) {
+    return !dirtyBits.isEmpty();
+  }
+  
+  @Override
+  public void setDirty(Persistent persistent) {
+    dirtyBits.set(0, dirtyBits.size());
+  }
+  
+  @Override
+  public void clearDirty(Persistent persistent, int fieldIndex) {
+    dirtyBits.clear(fieldIndex);
+  }
+  
+  public void clearDirty(Persistent persistent) {
+    dirtyBits.clear();
+  }
+  
+  public void setReadable(Persistent persistent, int fieldIndex) {
+    readableBits.set(fieldIndex);
+  }
+
+  public boolean isReadable(Persistent persistent, int fieldIndex) {
+    return readableBits.get(fieldIndex);
+  }
+
+  @Override
+  public void clearReadable(Persistent persistent, int fieldIndex) {
+    readableBits.clear(fieldIndex);
+  }
+  
+  public void clearReadable(Persistent persistent) {
+    readableBits.clear();
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/query/PartitionQuery.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/query/PartitionQuery.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/query/PartitionQuery.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/query/PartitionQuery.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,21 @@
+
+package org.gora.query;
+
+import org.gora.persistency.Persistent;
+
+/**
+ * PartitionQuery divides the results of the Query to multi partitions, so that 
+ * queries can be run locally on the nodes that hold the data. PartitionQuery's are 
+ * used for generating Hadoop InputSplits.
+ */
+public interface PartitionQuery<K, T extends Persistent> extends Query<K, T> {
+
+  /* PartitionQuery interface relaxes the dependency of DataStores to Hadoop*/
+  
+  /**
+   * Returns the locations on which this partial query will run locally.
+   * @return the addresses of machines
+   */
+  public String[] getLocations();
+  
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/query/Query.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/query/Query.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/query/Query.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/query/Query.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,136 @@
+
+package org.gora.query;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.Writable;
+import org.gora.persistency.Persistent;
+import org.gora.store.DataStore;
+
+/**
+ * A query to a data store to retrive objects. Queries are constructed by 
+ * the DataStore implementation via {@link DataStore#newQuery()}.
+ */
+public interface Query<K, T extends Persistent> extends Writable, Configurable {
+
+  /**
+   * Sets the dataStore of this query. Under normal operation, this call 
+   * is not necassary and it is potentially dangereous. So use this 
+   * method only if you know what you are doing.
+   * @param dataStore the dataStore of the query
+   */
+  public abstract void setDataStore(DataStore<K,T> dataStore);
+  
+  /**
+   * Returns the DataStore, that this Query is associated with.
+   * @return the DataStore of the Query
+   */
+  public abstract DataStore<K,T> getDataStore();
+  
+  /**
+   * Executes the Query on the DataStore and returns the results.
+   * @return the {@link Result} for the query.
+   */
+  public abstract Result<K,T> execute() throws IOException;
+  
+//  /**
+//   * Compiles the query for performance and error checking. This 
+//   * method is an optional optimization for DataStore implementations.
+//   */
+//  public abstract void compile();
+//  
+//  /**
+//   * Sets the query string
+//   * @param queryString the query in String
+//   */
+//  public abstract void setQueryString(String queryString);
+//  
+//  /**
+//   * Returns the query string
+//   * @return the query as String
+//   */
+//  public abstract String getQueryString();
+
+  /* Dimension : fields */
+  public abstract void setFields(String... fieldNames);
+
+  public abstract String[] getFields();
+
+  /* Dimension : key */ 
+  public abstract void setKey(K key);
+
+  public abstract void setStartKey(K startKey);
+
+  public abstract void setEndKey(K endKey);
+
+  public abstract void setKeyRange(K startKey, K endKey);
+
+  public abstract K getKey();
+
+  public abstract K getStartKey();
+
+  public abstract K getEndKey();
+  
+  /* Dimension : time */
+  public abstract void setTimestamp(long timestamp);
+
+  public abstract void setStartTime(long startTime);
+
+  public abstract void setEndTime(long endTime);
+
+  public abstract void setTimeRange(long startTime, long endTime);
+
+  public abstract long getTimestamp();
+
+  public abstract long getStartTime();
+
+  public abstract long getEndTime();
+
+//  public abstract void setFilter(String filter);
+//  
+//  public abstract String getFilter();
+  
+  /**
+   * Sets the maximum number of results to return.
+   */
+  public abstract void setLimit(long limit);
+
+  /**
+   * Returns the maximum number of results
+   * @return the limit if it is set, otherwise a negative number
+   */
+  public abstract long getLimit();
+
+  /* parameters */
+  /*
+  public abstract void setParam(int paramIndex, int value);
+  
+  public abstract void setParam(String paramName, int value);
+  
+  public abstract void setParam(int paramIndex, long value);
+  
+  public abstract void setParam(String paramName, long value);
+  
+  public abstract void setParam(int paramIndex, String value);
+  
+  public abstract void setParam(String paramName, String value);
+  
+  public abstract void setParam(int paramIndex, boolean value);
+  
+  public abstract void setParam(String paramName, boolean value);
+  
+  public abstract void setParam(int paramIndex, double value);
+  
+  public abstract void setParam(String paramName, double value);
+  
+  public abstract void setParam(int paramIndex, char value);
+  
+  public abstract void setParam(String paramName, char value);
+  
+  public abstract void setParam(int paramIndex, Date value);
+  
+  public abstract void setParam(String paramName, Date value);
+  */
+    
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/query/Result.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/query/Result.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/query/Result.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/query/Result.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,73 @@
+
+package org.gora.query;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.gora.persistency.Persistent;
+import org.gora.store.DataStore;
+
+/**
+ * A result to a {@link Query}. Objects in the result set can be 
+ * iterated by calling {@link #next()}, {@link #get()} 
+ * and {@link #getKey()}. 
+ */
+public interface Result<K,T extends Persistent> extends Closeable {
+
+  /**
+   * Returns the DataStore, that this Result is associated with.
+   * @return the DataStore of the Result
+   */
+  public abstract DataStore<K,T> getDataStore();
+  
+  /**
+   * Returns the Query object for this Result.
+   * @return the Query object for this Result.
+   */
+  public abstract Query<K, T> getQuery();
+  
+  /**
+   * Advances to the next element and returns false if at end.
+   * @return true if end is not reached yet
+   */
+  public abstract boolean next() throws IOException;
+  
+  /**
+   * Returns the current key.
+   * @return current key
+   */
+  public abstract K getKey();
+  
+  /**
+   * Returns the current object.
+   * @return current object
+   */
+  public abstract T get();
+  
+  /**
+   * Returns the class of the keys
+   * @return class of the keys
+   */
+  public abstract Class<K> getKeyClass();
+    
+  /**
+   * Returns the class of the persistent objects
+   * @return class of the persistent objects
+   */
+  public abstract Class<T> getPersistentClass();
+  
+  /**
+   * Returns the number of times next() is called with return value true.
+   * @return the number of results so far
+   */
+  public long getOffset();
+  
+  /**
+   * Returns how far along the result has iterated, a value between 0 and 1.
+   */
+  public float getProgress() throws IOException;
+  
+  @Override
+  public abstract void close() throws IOException;
+  
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/FileSplitPartitionQuery.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/FileSplitPartitionQuery.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/FileSplitPartitionQuery.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/FileSplitPartitionQuery.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,68 @@
+
+package org.gora.query.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.gora.persistency.Persistent;
+import org.gora.query.Query;
+
+/**
+ * Keeps a {@link FileSplit} to represent the partition boundaries.
+ * FileSplitPartitionQuery is best used with existing {@link InputFormat}s.
+ */
+public class FileSplitPartitionQuery<K, T extends Persistent>
+  extends PartitionQueryImpl<K,T> {
+
+  private FileSplit split;
+
+  public FileSplitPartitionQuery() {
+    super();
+  }
+
+  public FileSplitPartitionQuery(Query<K, T> baseQuery, FileSplit split)
+    throws IOException {
+    super(baseQuery, split.getLocations());
+    this.split = split;
+  }
+
+  public FileSplit getSplit() {
+    return split;
+  }
+
+  public long getLength() {
+    return split.getLength();
+  }
+
+  public long getStart() {
+    return split.getStart();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    split.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    if(split == null)
+      split = new FileSplit(null, 0, 0, null); //change to new FileSplit() once hadoop-core.jar is updated
+    split.readFields(in);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public boolean equals(Object obj) {
+    if(obj instanceof FileSplitPartitionQuery) {
+      return super.equals(obj) &&
+      this.split.equals(((FileSplitPartitionQuery)obj).split);
+    }
+    return false;
+  }
+
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/PartitionQueryImpl.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/PartitionQueryImpl.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/PartitionQueryImpl.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/PartitionQueryImpl.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,145 @@
+
+package org.gora.query.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.gora.persistency.Persistent;
+import org.gora.query.PartitionQuery;
+import org.gora.query.Query;
+import org.gora.store.DataStore;
+import org.gora.util.IOUtils;
+
+/**
+ * Implementation for {@link PartitionQuery}.
+ */
+public class PartitionQueryImpl<K, T extends Persistent>
+  extends QueryBase<K, T> implements PartitionQuery<K, T> {
+
+  protected Query<K, T> baseQuery;
+  protected String[] locations;
+
+  public PartitionQueryImpl() {
+    super(null);
+  }
+
+  public PartitionQueryImpl(Query<K, T> baseQuery, String... locations) {
+    this(baseQuery, null, null, locations);
+  }
+
+  public PartitionQueryImpl(Query<K, T> baseQuery, K startKey, K endKey,
+      String... locations) {
+    super(baseQuery.getDataStore());
+    this.baseQuery = baseQuery;
+    this.locations = locations;
+    setStartKey(startKey);
+    setEndKey(endKey);
+    this.dataStore = baseQuery.getDataStore();
+  }
+
+  @Override
+public String[] getLocations() {
+    return locations;
+  }
+
+  public Query<K, T> getBaseQuery() {
+    return baseQuery;
+  }
+
+  /* Override everything except start-key/end-key */
+
+  @Override
+  public String[] getFields() {
+    return baseQuery.getFields();
+  }
+
+  @Override
+  public DataStore<K, T> getDataStore() {
+    return baseQuery.getDataStore();
+  }
+
+  @Override
+  public long getTimestamp() {
+    return baseQuery.getTimestamp();
+  }
+
+  @Override
+  public long getStartTime() {
+    return baseQuery.getStartTime();
+  }
+
+  @Override
+  public long getEndTime() {
+    return baseQuery.getEndTime();
+  }
+
+  @Override
+  public long getLimit() {
+    return baseQuery.getLimit();
+  }
+
+  @Override
+  public void setFields(String... fields) {
+    baseQuery.setFields(fields);
+  }
+
+  @Override
+  public void setTimestamp(long timestamp) {
+    baseQuery.setTimestamp(timestamp);
+  }
+
+  @Override
+  public void setStartTime(long startTime) {
+    baseQuery.setStartTime(startTime);
+  }
+
+  @Override
+  public void setEndTime(long endTime) {
+    baseQuery.setEndTime(endTime);
+  }
+
+  @Override
+  public void setTimeRange(long startTime, long endTime) {
+    baseQuery.setTimeRange(startTime, endTime);
+  }
+
+  @Override
+  public void setLimit(long limit) {
+    baseQuery.setLimit(limit);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    IOUtils.serialize(null, out, baseQuery);
+    IOUtils.writeStringArray(out, locations);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    try {
+      baseQuery = IOUtils.deserialize(null, in, null);
+    } catch (ClassNotFoundException ex) {
+      throw new IOException(ex);
+    }
+    locations = IOUtils.readStringArray(in);
+    //we should override the data store as basequery's data store
+    //also we may not call super.readFields so that temporary this.dataStore
+    //is not created at all
+    this.dataStore = baseQuery.getDataStore();
+  }
+
+  @Override
+  @SuppressWarnings({ "rawtypes" })
+  public boolean equals(Object obj) {
+    if(obj instanceof PartitionQueryImpl) {
+      PartitionQueryImpl that = (PartitionQueryImpl) obj;
+      return this.baseQuery.equals(that.baseQuery)
+        && Arrays.equals(locations, that.locations);
+    }
+    return false;
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/QueryBase.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/QueryBase.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/QueryBase.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/QueryBase.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,296 @@
+
+package org.gora.query.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.gora.persistency.Persistent;
+import org.gora.query.Query;
+import org.gora.query.Result;
+import org.gora.store.DataStore;
+import org.gora.util.IOUtils;
+
+/**
+ * Base class for Query implementations.
+ */
+public abstract class QueryBase<K, T extends Persistent>
+implements Query<K,T> {
+
+  protected DataStore<K,T> dataStore;
+
+  protected String queryString;
+  protected String[] fields;
+
+  protected K startKey;
+  protected K endKey;
+
+  protected long startTime = -1;
+  protected long endTime = -1;
+
+  protected String filter;
+
+  protected long limit = -1;
+
+  protected boolean isCompiled = false;
+
+  private Configuration conf;
+
+  public QueryBase(DataStore<K,T> dataStore) {
+    this.dataStore = dataStore;
+  }
+
+  @Override
+  public Result<K,T> execute() throws IOException {
+    //compile();
+    return dataStore.execute(this);
+  }
+
+//  @Override
+//  public void compile() {
+//    if(!isCompiled) {
+//      isCompiled = true;
+//    }
+//  }
+
+  @Override
+  public void setDataStore(DataStore<K, T> dataStore) {
+    this.dataStore = dataStore;
+  }
+
+  @Override
+  public DataStore<K, T> getDataStore() {
+    return dataStore;
+  }
+
+//  @Override
+//  public void setQueryString(String queryString) {
+//    this.queryString = queryString;
+//  }
+//
+//  @Override
+//  public String getQueryString() {
+//    return queryString;
+//  }
+
+  @Override
+  public void setFields(String... fields) {
+    this.fields = fields;
+  }
+
+  @Override
+public String[] getFields() {
+    return fields;
+  }
+
+  @Override
+  public void setKey(K key) {
+    setKeyRange(key, key);
+  }
+
+  @Override
+  public void setStartKey(K startKey) {
+    this.startKey = startKey;
+  }
+
+  @Override
+  public void setEndKey(K endKey) {
+    this.endKey = endKey;
+  }
+
+  @Override
+  public void setKeyRange(K startKey, K endKey) {
+    this.startKey = startKey;
+    this.endKey = endKey;
+  }
+
+  @Override
+  public K getKey() {
+    if(startKey == endKey) {
+      return startKey; //address comparison
+    }
+    return null;
+  }
+
+  @Override
+  public K getStartKey() {
+    return startKey;
+  }
+
+  @Override
+  public K getEndKey() {
+    return endKey;
+  }
+
+  @Override
+  public void setTimestamp(long timestamp) {
+    setTimeRange(timestamp, timestamp);
+  }
+
+  @Override
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  @Override
+  public void setEndTime(long endTime) {
+    this.endTime = endTime;
+  }
+
+  @Override
+  public void setTimeRange(long startTime, long endTime) {
+    this.startTime = startTime;
+    this.endTime = endTime;
+  }
+
+  @Override
+  public long getTimestamp() {
+    return startTime == endTime ? startTime : -1;
+  }
+
+  @Override
+  public long getStartTime() {
+    return startTime;
+  }
+
+  @Override
+  public long getEndTime() {
+    return endTime;
+  }
+
+//  @Override
+//  public void setFilter(String filter) {
+//    this.filter = filter;
+//  }
+//
+//  @Override
+//  public String getFilter() {
+//    return filter;
+//  }
+
+  @Override
+  public void setLimit(long limit) {
+    this.limit = limit;
+  }
+
+  @Override
+  public long getLimit() {
+    return limit;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void readFields(DataInput in) throws IOException {
+    String dataStoreClass = Text.readString(in);
+    try {
+      dataStore = (DataStore<K, T>) ReflectionUtils.newInstance(
+          Class.forName(dataStoreClass), conf);
+      dataStore.readFields(in);
+    } catch (ClassNotFoundException ex) {
+      throw new IOException(ex);
+    }
+
+    boolean[] nullFields = IOUtils.readNullFieldsInfo(in);
+
+    if(!nullFields[0])
+      queryString = Text.readString(in);
+    if(!nullFields[1])
+      fields = IOUtils.readStringArray(in);
+    if(!nullFields[2])
+      startKey = IOUtils.deserialize(null, in, null, dataStore.getKeyClass());
+    if(!nullFields[3])
+      endKey = IOUtils.deserialize(null, in, null, dataStore.getKeyClass());
+    if(!nullFields[4])
+      filter = Text.readString(in);
+
+    startTime = WritableUtils.readVLong(in);
+    endTime = WritableUtils.readVLong(in);
+    limit = WritableUtils.readVLong(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    //write datastore
+    Text.writeString(out, dataStore.getClass().getCanonicalName());
+    dataStore.write(out);
+
+    IOUtils.writeNullFieldsInfo(out, queryString, (fields)
+        , startKey, endKey, filter);
+
+    if(queryString != null)
+      Text.writeString(out, queryString);
+    if(fields != null)
+      IOUtils.writeStringArray(out, fields);
+    if(startKey != null)
+      IOUtils.serialize(getConf(), out, startKey, dataStore.getKeyClass());
+    if(endKey != null)
+      IOUtils.serialize(getConf(), out, endKey, dataStore.getKeyClass());
+    if(filter != null)
+      Text.writeString(out, filter);
+
+    WritableUtils.writeVLong(out, getStartTime());
+    WritableUtils.writeVLong(out, getEndTime());
+    WritableUtils.writeVLong(out, getLimit());
+  }
+
+  @SuppressWarnings({ "rawtypes" })
+  @Override
+  public boolean equals(Object obj) {
+    if(obj instanceof QueryBase) {
+      QueryBase that = (QueryBase) obj;
+      EqualsBuilder builder = new EqualsBuilder();
+      builder.append(dataStore, that.dataStore);
+      builder.append(queryString, that.queryString);
+      builder.append(fields, that.fields);
+      builder.append(startKey, that.startKey);
+      builder.append(endKey, that.endKey);
+      builder.append(filter, that.filter);
+      builder.append(limit, that.limit);
+      return builder.isEquals();
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder builder = new HashCodeBuilder();
+    builder.append(dataStore);
+    builder.append(queryString);
+    builder.append(fields);
+    builder.append(startKey);
+    builder.append(endKey);
+    builder.append(filter);
+    builder.append(limit);
+    return builder.toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    ToStringBuilder builder = new ToStringBuilder(this);
+    builder.append("dataStore", dataStore);
+    builder.append("fields", fields);
+    builder.append("startKey", startKey);
+    builder.append("endKey", endKey);
+    builder.append("filter", filter);
+    builder.append("limit", limit);
+
+    return builder.toString();
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/ResultBase.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/ResultBase.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/ResultBase.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/query/impl/ResultBase.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,117 @@
+
+package org.gora.query.impl;
+
+import java.io.IOException;
+
+import org.gora.persistency.Persistent;
+import org.gora.query.Query;
+import org.gora.query.Result;
+import org.gora.store.DataStore;
+
+/**
+ * Base class for {@link Result} implementations.
+ */
+public abstract class ResultBase<K, T extends Persistent> 
+  implements Result<K, T> {
+
+  protected final DataStore<K,T> dataStore;
+  
+  protected final Query<K, T> query;
+  
+  protected K key;
+  
+  protected T persistent;
+  
+  /** Query limit */
+  protected long limit;
+  
+  /** How far we have proceeded*/
+  protected long offset = 0;
+  
+  public ResultBase(DataStore<K,T> dataStore, Query<K,T> query) {
+    this.dataStore = dataStore;
+    this.query = query;
+    this.limit = query.getLimit();
+  }
+  
+  @Override
+  public DataStore<K, T> getDataStore() {
+    return dataStore;
+  }
+  
+  @Override
+  public Query<K, T> getQuery() {
+    return query;
+  }
+  
+  @Override
+  public T get() {
+    return persistent;
+  }
+  
+  @Override
+  public K getKey() {
+    return key;
+  }
+    
+  @Override
+  public Class<K> getKeyClass() {
+    return getDataStore().getKeyClass();
+  }
+  
+  @Override
+  public Class<T> getPersistentClass() {
+    return getDataStore().getPersistentClass();
+  }
+  
+  /**
+   * Returns whether the limit for the query is reached. 
+   */
+  protected boolean isLimitReached() {
+    if(limit > 0 && offset >= limit) {
+      return true;
+    }
+    return false;
+  }
+  
+  protected void clear() {
+    if(persistent != null) {
+      persistent.clear();
+    }
+    if(key != null && key instanceof Persistent) {
+      ((Persistent)key).clear();
+    }
+  }
+  
+  @Override
+  public final boolean next() throws IOException {
+    if(isLimitReached()) {
+      return false;
+    }
+    
+    clear();
+    persistent = getOrCreatePersistent(persistent);
+    
+    boolean ret = nextInner();
+    if(ret) ++offset;
+    return ret;
+  }
+  
+  @Override
+  public long getOffset() {
+    return offset;
+  }
+  
+  /**
+   * {@link ResultBase#next()} calls this function to read the 
+   * actual results. 
+   */
+  protected abstract boolean nextInner() throws IOException; 
+  
+  protected T getOrCreatePersistent(T persistent) throws IOException {
+    if(persistent != null) {
+      return persistent;
+    }
+    return dataStore.newPersistent();
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/store/DataStore.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/store/DataStore.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/store/DataStore.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/store/DataStore.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,200 @@
+package org.gora.store;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.gora.persistency.BeanFactory;
+import org.gora.persistency.Persistent;
+import org.gora.query.PartitionQuery;
+import org.gora.query.Query;
+import org.gora.query.Result;
+
+/**
+ * DataStore handles actual object persistence. Objects can be persisted,
+ * fetched, queried or deleted by the DataStore methods. DataStores can be
+ * constructed by an instance of {@link DataStoreFactory}.
+ *
+ * <p> DataStores implementations should be thread safe.
+ * @param <K> the class of keys in the datastore
+ * @param <T> the class of persistent objects in the datastore
+ */
+public interface DataStore<K, T extends Persistent> extends Closeable,
+  Writable, Configurable {
+
+  /**
+   * Initializes this DataStore.
+   * @param keyClass the class of the keys
+   * @param persistentClass the class of the persistent objects
+   * @param properties extra metadata
+   * @throws IOException
+   */
+  public abstract void initialize(Class<K> keyClass, Class<T> persistentClass,
+      Properties properties) throws IOException;
+
+  /**
+   * Sets the class of the keys
+   * @param keyClass the class of keys
+   */
+  public abstract void setKeyClass(Class<K> keyClass);
+
+  /**
+   * Returns the class of the keys
+   * @return class of the keys
+   */
+  public abstract Class<K> getKeyClass();
+
+  /**
+   * Sets the class of the persistent objects
+   * @param persistentClass class of persistent objects
+   */
+  public abstract void setPersistentClass(Class<T> persistentClass);
+
+  /**
+   * Returns the class of the persistent objects
+   * @return class of the persistent objects
+   */
+  public abstract Class<T> getPersistentClass();
+
+  /**
+   * Returns the schema name given to this DataStore
+   * @return schema name
+   */
+  public abstract String getSchemaName();
+
+  /**
+   * Creates the optional schema or table (or similar) in the datastore
+   * to hold the objects. If the schema is already created previously,
+   * or the underlying data model does not support
+   * or need this operation, the operation is ignored.
+   */
+  public abstract void createSchema() throws IOException;
+
+  /**
+   * Deletes the underlying schema or table (or similar) in the datastore
+   * that holds the objects. This also deletes all the data associated with
+   * the schema.
+   */
+  public abstract void deleteSchema() throws IOException;
+
+  /**
+   * Deletes all the data associated with the schema, but keeps the
+   * schema (table or similar) intact.
+   */
+  public abstract void truncateSchema() throws IOException;
+
+  /**
+   * Returns whether the schema that holds the data exists in the datastore.
+   * @return whether schema exists
+   */
+  public abstract boolean schemaExists() throws IOException;
+
+  /**
+   * Returns a new instance of the key object.
+   * @return a new instance of the key object.
+   */
+  public abstract K newKey() throws IOException;
+
+  /**
+   * Returns a new instance of the managed persistent object.
+   * @return a new instance of the managed persistent object.
+   */
+  public abstract T newPersistent() throws IOException;
+
+  /**
+   * Returns the object corresponding to the given key fetching all the fields.
+   * @param key the key of the object
+   * @return the Object corresponding to the key or null if it cannot be found
+   */
+  public abstract T get(K key) throws IOException;
+
+  /**
+   * Returns the object corresponding to the given key.
+   * @param key the key of the object
+   * @param fields the fields required in the object. Pass null, to retrieve all fields
+   * @return the Object corresponding to the key or null if it cannot be found
+   */
+  public abstract T get(K key, String[] fields) throws IOException;
+
+  /**
+   * Inserts the persistent object with the given key.
+   */
+  public abstract void put(K key, T obj) throws IOException;
+
+  /**
+   * Deletes the object with the given key
+   * @param key the key of the object
+   * @return whether deleted the object successfuly
+   */
+  public abstract boolean delete(K key) throws IOException;
+
+  /**
+   * Deletes all the objects matching the query.
+   * @param query matching records to this query will be deleted
+   * @return number of deleted records
+   */
+  public abstract long deleteByQuery(Query<K, T> query) throws IOException;
+
+  /**
+   * Executes the given query and returns the results.
+   * @param query the query to execute.
+   * @return the results as a {@link Result} object.
+   */
+  public abstract Result<K,T> execute(Query<K, T> query) throws IOException;
+
+  /**
+   * Constructs and returns a new Query.
+   * @return a new Query.
+   */
+  public abstract Query<K, T> newQuery();
+
+  /**
+   * Partitions the given query and returns a list of {@link PartitionQuery}s,
+   * which will execute on local data.
+   * @param query the base query to create the partitions for. If the query
+   * is null, then the data store returns the partitions for the default query
+   * (returning every object)
+   * @return a List of PartitionQuery's
+   */
+  public abstract List<PartitionQuery<K,T>> getPartitions(Query<K,T> query)
+    throws IOException;
+
+  /**
+   * Forces the write caches to be flushed.
+   */
+  public abstract void flush() throws IOException;
+
+  /**
+   * Sets the {@link BeanFactory} to use by the DataStore.
+   * @param beanFactory the BeanFactory to use
+   */
+  public abstract void setBeanFactory(BeanFactory<K,T> beanFactory);
+
+  /**
+   * Returns the BeanFactory used by the DataStore
+   * @return the BeanFactory used by the DataStore
+   */
+  public abstract BeanFactory<K,T> getBeanFactory();
+
+  @Override
+  public abstract void close() throws IOException;
+
+  @Override
+  public Configuration getConf();
+
+  @Override
+  public void setConf(Configuration conf);
+
+  @Override
+  public void readFields(DataInput in) throws IOException;
+
+  @Override
+  public void write(DataOutput out) throws IOException;
+
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/store/DataStoreFactory.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/store/DataStoreFactory.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/store/DataStoreFactory.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/store/DataStoreFactory.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,319 @@
+package org.gora.store;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.gora.persistency.Persistent;
+import org.gora.store.impl.DataStoreBase;
+import org.gora.util.ReflectionUtils;
+
+/**
+ * A Factory for {@link DataStore}s. DataStoreFactory instances are thread-safe.
+ */
+public class DataStoreFactory {
+
+  public static final Log log = LogFactory.getLog(DataStoreFactory.class);
+
+  public static final String GORA_DEFAULT_PROPERTIES_FILE = "gora.properties";
+
+  public static final String GORA_DEFAULT_DATASTORE_KEY = "gora.datastore.default";
+
+  public static final String GORA = "gora";
+
+  public static final String DATASTORE = "datastore";
+
+  private static final String GORA_DATASTORE = GORA + "." + DATASTORE + ".";
+
+  public static final String AUTO_CREATE_SCHEMA = "autocreateschema";
+
+  public static final String INPUT_PATH  = "input.path";
+
+  public static final String OUTPUT_PATH = "output.path";
+
+  public static final String MAPPING_FILE = "mapping.file";
+
+  public static final String SCHEMA_NAME = "schema.name";
+
+  private static String propertiesFile = GORA_DEFAULT_PROPERTIES_FILE;
+
+  private static String defaultDataStoreClass;
+
+  private static HashMap<Integer, DataStore<?,?>> dataStores;
+
+  public static Properties properties;
+
+  static {
+    dataStores = new HashMap<Integer, DataStore<?,?>>();
+    try {
+      readProperties();
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private DataStoreFactory() { }
+
+  private static <K, T extends Persistent> void initializeDataStore(
+      DataStore<K, T> dataStore, Class<K> keyClass, Class<T> persistent
+      , Properties properties) throws IOException {
+    dataStore.initialize(keyClass, persistent, properties);
+  }
+
+  public static <D extends DataStore<K,T>, K, T extends Persistent>
+  D createDataStore(Class<D> dataStoreClass
+      , Class<K> keyClass, Class<T> persistent) {
+    return createDataStore(dataStoreClass, keyClass, persistent, properties);
+  }
+
+  public static <D extends DataStore<K,T>, K, T extends Persistent>
+  D createDataStore(Class<D> dataStoreClass
+      , Class<K> keyClass, Class<T> persistent, String schemaName) {
+    return createDataStore(dataStoreClass, keyClass, persistent, properties, schemaName);
+  }
+
+  public static <D extends DataStore<K,T>, K, T extends Persistent>
+  D createDataStore(Class<D> dataStoreClass, Class<K> keyClass
+      , Class<T> persistent, Properties properties, String schemaName) {
+    try {
+      setDefaultSchemaName(properties, schemaName);
+      D dataStore =
+        ReflectionUtils.newInstance(dataStoreClass);
+      initializeDataStore(dataStore, keyClass, persistent, properties);
+      return dataStore;
+
+    } catch (Exception ex) {
+      log.error(StringUtils.stringifyException(ex));
+      return null;
+    }
+  }
+
+  public static <D extends DataStore<K,T>, K, T extends Persistent>
+  D createDataStore(Class<D> dataStoreClass
+      , Class<K> keyClass, Class<T> persistent, Properties properties) {
+    return createDataStore(dataStoreClass, keyClass, persistent, properties, null);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <D extends DataStore<K,T>, K, T extends Persistent>
+  D getDataStore( Class<D> dataStoreClass, Class<K> keyClass,
+      Class<T> persistentClass) {
+    int hash = getDataStoreKey(dataStoreClass, keyClass, persistentClass);
+
+    D dataStore = (D) dataStores.get(hash);
+    if(dataStore == null) {
+      dataStore = createDataStore(dataStoreClass, keyClass, persistentClass
+          , properties);
+      dataStores.put(hash, dataStore);
+    }
+    return dataStore;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static synchronized <K, T extends Persistent> DataStore<K, T> getDataStore(
+      String dataStoreClass, Class<K> keyClass, Class<T> persistentClass)
+      throws ClassNotFoundException {
+
+    Class<? extends DataStore<K,T>> c
+        = (Class<? extends DataStore<K, T>>) Class.forName(dataStoreClass);
+    return getDataStore(c, keyClass, persistentClass);
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public static synchronized DataStore getDataStore(
+      String dataStoreClass, String keyClass, String persistentClass)
+    throws ClassNotFoundException {
+
+    Class k = Class.forName(keyClass);
+    Class p = Class.forName(persistentClass);
+    return getDataStore(dataStoreClass, k, p);
+  }
+
+  public static <K, T extends Persistent> DataStore<K, T> getDataStore(
+      Class<K> keyClass, Class<T> persistent) {
+    try {
+      return getDataStore(defaultDataStoreClass, keyClass, persistent);
+    } catch (ClassNotFoundException ex) {
+      return null;
+    }
+  }
+
+  private static int getDataStoreKey(
+      Class<?> dataStoreClass, Class<?> keyClass, Class<?> persistent) {
+
+    long hash = (((dataStoreClass.hashCode() * 27L)
+        + keyClass.hashCode()) * 31) + persistent.hashCode();
+
+    return (int)hash;
+  }
+
+  private static Properties readProperties() throws IOException {
+    Properties properties = new Properties();
+    if(propertiesFile != null) {
+      InputStream stream = DataStoreFactory.class.getClassLoader()
+        .getResourceAsStream(propertiesFile);
+      if(stream != null) {
+        try {
+          properties.load(stream);
+          setProperties(properties);
+          return properties;
+        } finally {
+          stream.close();
+        }
+      }
+    }
+    log.warn("Gora properties are not loaded!");
+    return null;
+  }
+
+  /**
+   * Tries to find a property with the given baseKey. First the property
+   * key constructed as "gora.&lt;classname&gt;.&lt;baseKey&gt;" is searched.
+   * If not found, the property keys for all superclasses is recursively
+   * tested. Lastly, the property key constructed as
+   * "gora.datastore.&lt;baseKey&gt;" is searched.
+   * @return the first found value, or defaultValue
+   */
+  public static String findProperty(Properties properties
+      , DataStore<?, ?> store, String baseKey, String defaultValue) {
+
+    //recursively try the class names until the base class
+    Class<?> clazz = store.getClass();
+    while(true) {
+      String fullKey = GORA + "." + org.gora.util.StringUtils.getClassname(clazz) + "." + baseKey;
+      String value = getProperty(properties, fullKey);
+      if(value != null) {
+        return value;
+      }
+      //try once with lowercase
+      value = getProperty(properties, fullKey.toLowerCase());
+      if(value != null) {
+        return value;
+      }
+
+      if(clazz.equals(DataStoreBase.class)) {
+        break;
+      }
+      clazz = clazz.getSuperclass();
+      if(clazz == null) {
+        break;
+      }
+    }
+    //try with "datastore"
+    String fullKey = GORA + "." + DATASTORE + "." + baseKey;
+    String value = getProperty(properties, fullKey);
+    if(value != null) {
+      return value;
+    }
+    return defaultValue;
+  }
+
+  /**
+   * Tries to find a property with the given baseKey. First the property
+   * key constructed as "gora.&lt;classname&gt;.&lt;baseKey&gt;" is searched.
+   * If not found, the property keys for all superclasses is recursively
+   * tested. Lastly, the property key constructed as
+   * "gora.datastore.&lt;baseKey&gt;" is searched.
+   * @return the first found value, or throws IOException
+   */
+  public static String findPropertyOrDie(Properties properties
+      , DataStore<?, ?> store, String baseKey) throws IOException {
+    String val = findProperty(properties, store, baseKey, null);
+    if(val == null) {
+      throw new IOException("Property with base name \""+baseKey+"\" could not be found, make " +
+      		"sure to include this property in gora.properties file");
+    }
+    return val;
+  }
+
+  public static boolean findBooleanProperty(Properties properties
+      , DataStore<?, ?> store, String baseKey, String defaultValue) {
+    return Boolean.parseBoolean(findProperty(properties, store, baseKey, defaultValue));
+  }
+
+  public static boolean getAutoCreateSchema(Properties properties
+      , DataStore<?,?> store) {
+    return findBooleanProperty(properties, store, AUTO_CREATE_SCHEMA, "true");
+  }
+
+  /**
+   * Returns the input path as read from the properties for file-backed data stores.
+   */
+  public static String getInputPath(Properties properties, DataStore<?,?> store) {
+    return findProperty(properties, store, INPUT_PATH, null);
+  }
+
+  /**
+   * Returns the output path as read from the properties for file-backed data stores.
+   */
+  public static String getOutputPath(Properties properties, DataStore<?,?> store) {
+    return findProperty(properties, store, OUTPUT_PATH, null);
+  }
+
+  public static String getMappingFile(Properties properties, DataStore<?,?> store
+      , String defaultValue) {
+    return findProperty(properties, store, MAPPING_FILE, defaultValue);
+  }
+
+  private static void setProperties(Properties properties) {
+    defaultDataStoreClass = getProperty(properties, GORA_DEFAULT_DATASTORE_KEY);
+    DataStoreFactory.properties = properties;
+  }
+
+  private static String getProperty(Properties properties, String key) {
+    return getProperty(properties, key, null);
+  }
+
+  private static String getProperty(Properties properties, String key, String defaultValue) {
+    if (properties == null) {
+      return defaultValue;
+    }
+    String result = properties.getProperty(key);
+    if (result == null) {
+      return defaultValue;
+    }
+    return result;
+  }
+
+  /**
+   * Sets a property for all the datastores
+   */
+  private static void setProperty(Properties properties, String baseKey, String value) {
+    if(value != null)
+      properties.setProperty(GORA_DATASTORE + baseKey, value);
+  }
+
+  /**
+   * Sets a property for the datastores of the given class
+   */
+  private static<D extends DataStore<K,T>, K, T extends Persistent>
+    void setProperty(Properties properties, Class<D> dataStoreClass, String baseKey, String value) {
+    properties.setProperty(GORA+"."+org.gora.util.StringUtils.getClassname(dataStoreClass)+"."+baseKey, value);
+  }
+
+  /**
+   * Gets the default schema name to be used by the datastore
+   */
+  public static String getDefaultSchemaName(Properties properties, DataStore<?,?> store) {
+    return findProperty(properties, store, SCHEMA_NAME, null);
+  }
+
+  /**
+   * Sets the default schema name to be used by the datastores
+   */
+  public static void setDefaultSchemaName(Properties properties, String schemaName) {
+    setProperty(properties, SCHEMA_NAME, schemaName);
+  }
+
+  /**
+   * Sets the default schema name to be used by the datastores of the given class
+   */
+  public static<D extends DataStore<K,T>, K, T extends Persistent>
+  void setDefaultSchemaName(Properties properties, Class<D> dataStoreClass, String schemaName) {
+    setProperty(properties, dataStoreClass, SCHEMA_NAME, schemaName);
+  }
+}
\ No newline at end of file

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/store/FileBackedDataStore.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/store/FileBackedDataStore.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/store/FileBackedDataStore.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/store/FileBackedDataStore.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,30 @@
+
+package org.gora.store;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.gora.persistency.Persistent;
+
+/** FileBackedDataStore supplies necessary interfaces to set input 
+ * and output paths for data stored which are file based.   
+ */
+public interface FileBackedDataStore<K, T extends Persistent> extends DataStore<K, T> {
+
+  public void setInputPath(String inputPath);
+  
+  public void setOutputPath(String outputPath);
+  
+  public String getInputPath();
+  
+  public String getOutputPath();
+  
+  public void setInputStream(InputStream inputStream);
+  
+  public void setOutputStream(OutputStream outputStream);
+
+  public InputStream getInputStream();
+  
+  public OutputStream getOutputStream();
+  
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/store/impl/DataStoreBase.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/store/impl/DataStoreBase.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/store/impl/DataStoreBase.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/store/impl/DataStoreBase.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,213 @@
+
+package org.gora.store.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.gora.avro.PersistentDatumReader;
+import org.gora.avro.PersistentDatumWriter;
+import org.gora.persistency.BeanFactory;
+import org.gora.persistency.Persistent;
+import org.gora.persistency.impl.BeanFactoryImpl;
+import org.gora.store.DataStore;
+import org.gora.store.DataStoreFactory;
+import org.gora.util.AvroUtils;
+import org.gora.util.StringUtils;
+
+/**
+ * A Base class for {@link DataStore}s.
+ */
+public abstract class DataStoreBase<K, T extends Persistent>
+implements DataStore<K, T> {
+
+  protected BeanFactory<K, T> beanFactory;
+
+  protected Class<K> keyClass;
+  protected Class<T> persistentClass;
+
+  /** The schema of the persistent class*/
+  protected Schema schema;
+
+  /** A map of field names to Field objects containing schema's fields*/
+  protected Map<String, Field> fieldMap;
+
+  protected Configuration conf;
+
+  protected boolean autoCreateSchema;
+
+  protected Properties properties;
+
+  protected PersistentDatumReader<T> datumReader;
+
+  protected PersistentDatumWriter<T> datumWriter;
+
+  public DataStoreBase() {
+  }
+
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass,
+      Properties properties) throws IOException {
+    setKeyClass(keyClass);
+    setPersistentClass(persistentClass);
+    if(this.beanFactory == null)
+      this.beanFactory = new BeanFactoryImpl<K, T>(keyClass, persistentClass);
+    schema = this.beanFactory.getCachedPersistent().getSchema();
+    fieldMap = AvroUtils.getFieldMap(schema);
+
+    autoCreateSchema = DataStoreFactory.getAutoCreateSchema(properties, this);
+    this.properties = properties;
+
+    datumReader = new PersistentDatumReader<T>(schema, false);
+    datumWriter = new PersistentDatumWriter<T>(schema, false);
+  }
+
+  @Override
+  public void setPersistentClass(Class<T> persistentClass) {
+    this.persistentClass = persistentClass;
+  }
+
+  @Override
+  public Class<T> getPersistentClass() {
+    return persistentClass;
+  }
+
+  @Override
+  public Class<K> getKeyClass() {
+    return keyClass;
+  }
+
+  @Override
+  public void setKeyClass(Class<K> keyClass) {
+    if(keyClass != null)
+      this.keyClass = keyClass;
+  }
+
+  @Override
+  public K newKey() throws IOException {
+    try {
+      return beanFactory.newKey();
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  @Override
+  public T newPersistent() throws IOException {
+    try {
+      return beanFactory.newPersistent();
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  @Override
+  public void setBeanFactory(BeanFactory<K, T> beanFactory) {
+    this.beanFactory = beanFactory;
+  }
+
+  @Override
+  public BeanFactory<K, T> getBeanFactory() {
+    return beanFactory;
+  }
+
+  @Override
+public T get(K key) throws IOException {
+    return get(key, null);
+  };
+
+  /**
+   * Checks whether the fields argument is null, and if so
+   * returns all the fields of the Persistent object, else returns the
+   * argument.
+   */
+  protected String[] getFieldsToQuery(String[] fields) {
+    if(fields != null) {
+      return fields;
+    }
+    return beanFactory.getCachedPersistent().getFields();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  protected Configuration getOrCreateConf() {
+    if(conf == null) {
+      conf = new Configuration();
+    }
+    return conf;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void readFields(DataInput in) throws IOException {
+    try {
+      Class<K> keyClass = (Class<K>) Class.forName(Text.readString(in));
+      Class<T> persistentClass = (Class<T>)Class.forName(Text.readString(in));
+      initialize(keyClass, persistentClass, DataStoreFactory.properties);
+
+    } catch (ClassNotFoundException ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, getKeyClass().getCanonicalName());
+    Text.writeString(out, getPersistentClass().getCanonicalName());
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if(obj instanceof DataStoreBase) {
+      @SuppressWarnings("rawtypes")
+	  DataStoreBase that = (DataStoreBase) obj;
+      EqualsBuilder builder = new EqualsBuilder();
+      builder.append(this.keyClass, that.keyClass);
+      builder.append(this.persistentClass, that.persistentClass);
+      return builder.isEquals();
+    }
+    return false;
+  }
+
+  @Override
+  /** Default implementation deletes and recreates the schema*/
+  public void truncateSchema() throws IOException {
+    deleteSchema();
+    createSchema();
+  }
+
+  /**
+   * Returns the name of the schema to use for the persistent class. If the mapping schema name is
+   * provided it is returned first, else the properties file is searched, and the default schema name is
+   * returned if found. Else, the class name, without the package, of the persistent class is returned.
+   * @param mappingSchemaName the name of the schema as read from the mapping file
+   * @param persistentClass persistent class
+   */
+  protected String getSchemaName(String mappingSchemaName, Class<?> persistentClass) {
+    String schemaName = DataStoreFactory.getDefaultSchemaName(properties, this);
+    if(schemaName != null) {
+      return schemaName;
+    }
+
+    if(mappingSchemaName != null) {
+      return mappingSchemaName;
+    }
+
+    return StringUtils.getClassname(persistentClass);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/store/impl/FileBackedDataStoreBase.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/store/impl/FileBackedDataStoreBase.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/store/impl/FileBackedDataStoreBase.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/store/impl/FileBackedDataStoreBase.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,211 @@
+
+package org.gora.store.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.gora.mapreduce.GoraMapReduceUtils;
+import org.gora.persistency.Persistent;
+import org.gora.query.PartitionQuery;
+import org.gora.query.Query;
+import org.gora.query.Result;
+import org.gora.query.impl.FileSplitPartitionQuery;
+import org.gora.store.DataStoreFactory;
+import org.gora.store.FileBackedDataStore;
+import org.gora.util.OperationNotSupportedException;
+
+/**
+ * Base implementations for {@link FileBackedDataStore} methods.
+ */
+public abstract class FileBackedDataStoreBase<K, T extends Persistent>
+  extends DataStoreBase<K, T> implements FileBackedDataStore<K, T> {
+
+  protected long inputSize; //input size in bytes
+
+  protected String inputPath;
+  protected String outputPath;
+
+  protected InputStream inputStream;
+  protected OutputStream outputStream;
+
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass,
+      Properties properties) throws IOException {
+    super.initialize(keyClass, persistentClass, properties);
+    if(properties != null) {
+      if(this.inputPath == null) {
+        this.inputPath = DataStoreFactory.getInputPath(properties, this);
+      }
+      if(this.outputPath == null) {
+        this.outputPath = DataStoreFactory.getOutputPath(properties, this);
+      }
+    }
+  }
+
+  @Override
+public void setInputPath(String inputPath) {
+    this.inputPath = inputPath;
+  }
+
+  @Override
+public void setOutputPath(String outputPath) {
+    this.outputPath = outputPath;
+  }
+
+  @Override
+public String getInputPath() {
+    return inputPath;
+  }
+
+  @Override
+public String getOutputPath() {
+    return outputPath;
+  }
+
+  @Override
+public void setInputStream(InputStream inputStream) {
+    this.inputStream = inputStream;
+  }
+
+  @Override
+public void setOutputStream(OutputStream outputStream) {
+    this.outputStream = outputStream;
+  }
+
+  @Override
+public InputStream getInputStream() {
+    return inputStream;
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    return outputStream;
+  }
+
+  /** Opens an InputStream for the input Hadoop path */
+  protected InputStream createInputStream() throws IOException {
+    //TODO: if input path is a directory, use smt like MultiInputStream to
+    //read all the files recursively
+    Path path = new Path(inputPath);
+    FileSystem fs = path.getFileSystem(getConf());
+    inputSize = fs.getFileStatus(path).getLen();
+    return fs.open(path);
+  }
+
+  /** Opens an OutputStream for the output Hadoop path */
+  protected OutputStream createOutputStream() throws IOException {
+    Path path = new Path(outputPath);
+    FileSystem fs = path.getFileSystem(getConf());
+    return fs.create(path);
+  }
+
+  protected InputStream getOrCreateInputStream() throws IOException {
+    if(inputStream == null) {
+      inputStream = createInputStream();
+    }
+    return inputStream;
+  }
+
+  protected OutputStream getOrCreateOutputStream() throws IOException {
+    if(outputStream == null) {
+      outputStream = createOutputStream();
+    }
+    return outputStream;
+  }
+
+  @Override
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
+      throws IOException {
+    List<InputSplit> splits = GoraMapReduceUtils.getSplits(getConf(), inputPath);
+    List<PartitionQuery<K, T>> queries = new ArrayList<PartitionQuery<K,T>>(splits.size());
+
+    for(InputSplit split : splits) {
+      queries.add(new FileSplitPartitionQuery<K, T>(query, (FileSplit) split));
+    }
+
+    return queries;
+  }
+
+  @Override
+  public Result<K, T> execute(Query<K, T> query) throws IOException {
+    if(query instanceof FileSplitPartitionQuery) {
+        return executePartial((FileSplitPartitionQuery<K, T>) query);
+    } else {
+      return executeQuery(query);
+    }
+  }
+
+  /**
+   * Executes a normal Query reading the whole data. #execute() calls this function
+   * for non-PartitionQuery's.
+   */
+  protected abstract Result<K,T> executeQuery(Query<K,T> query)
+    throws IOException;
+
+  /**
+   * Executes a PartitialQuery, reading the data between start and end.
+   */
+  protected abstract Result<K,T> executePartial(FileSplitPartitionQuery<K,T> query)
+    throws IOException;
+
+  @Override
+  public void flush() throws IOException {
+    if(outputStream != null)
+      outputStream.flush();
+  }
+
+  @Override
+  public void createSchema() throws IOException {
+  }
+
+  @Override
+  public void deleteSchema() throws IOException {
+    throw new OperationNotSupportedException("delete schema is not supported for " +
+    		"file backed data stores");
+  }
+
+  @Override
+  public boolean schemaExists() throws IOException {
+    return true;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    org.gora.util.IOUtils.writeNullFieldsInfo(out, inputPath, outputPath);
+    if(inputPath != null)
+      Text.writeString(out, inputPath);
+    if(outputPath != null)
+      Text.writeString(out, outputPath);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    boolean[] nullFields = org.gora.util.IOUtils.readNullFieldsInfo(in);
+    if(!nullFields[0])
+      inputPath = Text.readString(in);
+    if(!nullFields[1])
+      outputPath = Text.readString(in);
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.closeStream(inputStream);
+    IOUtils.closeStream(outputStream);
+    inputStream = null;
+    outputStream = null;
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/util/AvroUtils.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/util/AvroUtils.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/util/AvroUtils.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/util/AvroUtils.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,51 @@
+
+package org.gora.util;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.reflect.ReflectData;
+import org.gora.persistency.Persistent;
+
+/**
+ * An utility class for Avro related tasks 
+ */
+public class AvroUtils {
+
+  /**
+   * Returns a map of field name to Field for schema's fields.
+   */
+  public static Map<String, Field> getFieldMap(Schema schema) {
+    List<Field> fields = schema.getFields();
+    HashMap<String, Field> fieldMap = new HashMap<String, Field>(fields.size());
+    for(Field field: fields) {
+      fieldMap.put(field.name(), field);
+    }
+    return fieldMap;
+  }
+  
+  @SuppressWarnings("unchecked")
+  public static Object getEnumValue(Schema schema, String symbol) {
+    return Enum.valueOf(ReflectData.get().getClass(schema), symbol);
+  }
+  
+  public static Object getEnumValue(Schema schema, int enumOrdinal) {
+    String symbol = schema.getEnumSymbols().get(enumOrdinal);
+    return getEnumValue(schema, symbol);
+  }
+  
+  /**
+   * Returns the schema of the class
+   */
+  public static Schema getSchema(Class<? extends Persistent> clazz) 
+    throws SecurityException, NoSuchFieldException
+    , IllegalArgumentException, IllegalAccessException {
+    
+    java.lang.reflect.Field field = clazz.getDeclaredField("_SCHEMA");
+    return (Schema) field.get(null);
+  }
+  
+}



Mime
View raw message