gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject svn commit: r1006024 [2/8] - in /incubator/gora: branches/ tags/ trunk/ trunk/bin/ trunk/conf/ trunk/docs/ trunk/gora-cassandra/ trunk/gora-cassandra/ivy/ trunk/gora-cassandra/lib-ext/ trunk/gora-cassandra/src/ trunk/gora-cassandra/src/examples/ trunk/...
Date Fri, 08 Oct 2010 21:17:17 GMT
Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/Select.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/Select.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/Select.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/Select.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,78 @@
+package org.gora.cassandra.client;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.gora.util.ByteUtils;
+
+public class Select {
+
+  private static final SlicePredicate ALL_PREDICATE;
+
+  static {
+    ALL_PREDICATE = new SlicePredicate();
+    ALL_PREDICATE.setSlice_range(new SliceRange(new byte[0], new byte[0],
+                                                false, Integer.MAX_VALUE));
+  }
+
+  private Map<ColumnParent, SlicePredicate> predicateMap;
+
+  public Select() {
+    predicateMap = new HashMap<ColumnParent, SlicePredicate>();
+  }
+
+  private SlicePredicate getOrCreate(ColumnParent columnParent) {
+    SlicePredicate predicate = predicateMap.get(columnParent);
+    if (predicate == null) {
+      predicate = new SlicePredicate();
+      predicateMap.put(columnParent, predicate);
+    }
+    return predicate;
+  }
+
+  public Select addColumnName(String superColumnFamily, String superColumn,
+      String columnName) {
+    ColumnParent parent = new ColumnParent(superColumnFamily);
+    parent.setSuper_column(ByteUtils.toBytes(superColumn));
+    SlicePredicate predicate = getOrCreate(parent);
+    if (predicate.getSlice_range() != null) {
+      // TODO: Make this another exception
+      throw new RuntimeException("Can't add columns if slice_range is not null");
+    }
+    predicate.addToColumn_names(ByteUtils.toBytes(columnName));
+    return this;
+  }
+
+  public Select addColumnName(String columnFamily, String columnName) {
+    SlicePredicate predicate = getOrCreate(new ColumnParent(columnFamily));
+    if (predicate.getSlice_range() != null) {
+      // TODO: Make this another exception
+      throw new RuntimeException("Can't add columns if slice_range is not null");
+    }
+    predicate.addToColumn_names(ByteUtils.toBytes(columnName));
+    return this;
+  }
+
+  public Select addSuperColumnAll(String superColumnFamily) {
+    return addColumnAll(superColumnFamily);
+  }
+
+  public Select addAllColumnsForSuperColumn(String superColumnFamily, String superColumnName) {
+    ColumnParent parent = new ColumnParent(superColumnFamily);
+    parent.setSuper_column(ByteUtils.toBytes(superColumnName));
+    predicateMap.put(parent, ALL_PREDICATE);
+    return this;
+  }
+
+  public Select addColumnAll(String columnFamily) {
+    predicateMap.put(new ColumnParent(columnFamily), ALL_PREDICATE);
+    return this;
+  }
+
+  /*package*/ Map<ColumnParent, SlicePredicate> getPredicateMap() {
+    return predicateMap;
+  }
+}

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SimpleCassandraClient.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SimpleCassandraClient.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SimpleCassandraClient.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SimpleCassandraClient.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,247 @@
+package org.gora.cassandra.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.TokenRange;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class SimpleCassandraClient implements CassandraClient {
+
+  public Cassandra.Client client;
+
+  private TTransport transport;
+
+  private String keySpace;
+
+  private ConsistencyLevel consistencyLevel;
+
+  private static ExecutorService SERVICE =
+    Executors.newSingleThreadScheduledExecutor();
+
+  private static int NUM_CLIENTS = 0;
+
+  static {
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        synchronized (SERVICE) {
+          if (!SERVICE.isShutdown()) {
+            SERVICE.shutdown();
+          }
+        }
+      }
+    });
+  }
+
+  public SimpleCassandraClient(String host, int port, String keySpace)
+  throws TTransportException {
+    this(host, port, keySpace, ConsistencyLevel.ONE);
+  }
+
+  public SimpleCassandraClient(String host, int port,
+      String keySpace, ConsistencyLevel consistencyLevel)
+  throws TTransportException {
+    this.transport = new TSocket(host, port);
+    this.transport.open();
+    this.client = new Cassandra.Client(new TBinaryProtocol(transport));
+    setKeySpace(keySpace);
+    setConsistencyLevel(consistencyLevel);
+    synchronized (SERVICE) {
+      NUM_CLIENTS++;
+      if (SERVICE.isShutdown()) {
+        SERVICE = Executors.newSingleThreadScheduledExecutor();
+      }
+    }
+  }
+
+  public Cassandra.Client getClient() {
+    return client;
+  }
+
+  @Override
+  public Map<String, Map<String, String>> describeKeySpace()
+  throws IOException {
+    try {
+      return client.describe_keyspace(keySpace);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public Row get(String key, Select select) throws IOException {
+    List<SliceGet> sliceGets =
+      new ArrayList<SliceGet>();
+    Map<ColumnParent, SlicePredicate> predicateMap = select.getPredicateMap();
+
+    for (Entry<ColumnParent, SlicePredicate> e : predicateMap.entrySet()) {
+      sliceGets.add(new SliceGet(client, keySpace, key,
+          e.getKey(), e.getValue(), consistencyLevel));
+    }
+    List<Future<Pair<ColumnParent, List<ColumnOrSuperColumn>>>> results;
+    try {
+      results = SERVICE.invokeAll(sliceGets);
+    } catch (InterruptedException e1) {
+      throw new IOException(e1);
+    }
+    Row row = new Row(key);
+    for (Future<Pair<ColumnParent, List<ColumnOrSuperColumn>>> f : results) {
+      Pair<ColumnParent, List<ColumnOrSuperColumn>> pair;
+      try {
+        pair = f.get();
+      } catch (Exception e1) {
+        throw new IOException(e1);
+      }
+      for (ColumnOrSuperColumn csc : pair.getSecond()) {
+        ColumnParent parent = pair.getFirst();
+        row.addColumnOrSuperColumn(parent.column_family, parent.super_column, csc);
+      }
+    }
+    return row;
+  }
+
+  public List<Row> getRangeIntl(String startKey, String endKey, int rowCount, Select select)
+  throws IOException {
+    KeyRange range = new KeyRange(rowCount).setStart_key(startKey).setEnd_key(endKey);
+    return getKeyRange(range, select);
+  }
+
+  public List<Row> getTokenRangeIntl(String startToken, String endToken, int rowCount,
+      Select select) throws IOException {
+    KeyRange range =
+      new KeyRange(rowCount).setStart_token(startToken).setEnd_token(endToken);
+    return getKeyRange(range, select);
+  }
+
+  @Override
+  public RowIterable getRange(String startKey, String endKey, int rowCount,
+      Select select) throws IOException {
+    // TODO: Not yet implemented!!!
+    throw new NotImplementedException("Not yet implemented!");
+  }
+
+  @Override
+  public RowIterable getTokenRange(String startToken, String endToken,
+      int rowCount, Select select) throws IOException {
+    return new TokenRangeRowIterableImpl(this, startToken, endToken, rowCount, select);
+  }
+
+  private List<Row> getKeyRange(KeyRange keyRange, Select select)
+  throws IOException {
+    List<RangeSliceGet> rangeSliceGets =
+      new ArrayList<RangeSliceGet>();
+    Map<ColumnParent, SlicePredicate> predicateMap = select.getPredicateMap();
+
+    for (Entry<ColumnParent, SlicePredicate> e : predicateMap.entrySet()) {
+      rangeSliceGets.add(new RangeSliceGet(client, keySpace, keyRange,
+          e.getKey(), e.getValue(), consistencyLevel));
+    }
+    List<Future<Pair<ColumnParent, List<KeySlice>>>> results;
+    try {
+      results = SERVICE.invokeAll(rangeSliceGets);
+    } catch (InterruptedException e1) {
+      throw new IOException(e1);
+    }
+
+    Map<String, Row> rowMap = new HashMap<String, Row>();
+
+    for (Future<Pair<ColumnParent, List<KeySlice>>> keySlicesTask : results) {
+      Pair<ColumnParent, List<KeySlice>> keySlices;
+      try {
+        keySlices = keySlicesTask.get();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      for (KeySlice keySlice : keySlices.getSecond()) {
+        addKeySliceToRowMap(rowMap, keySlices.getFirst(), keySlice);
+      }
+    }
+
+    return new ArrayList<Row>(rowMap.values());
+  }
+
+  private void addKeySliceToRowMap(Map<String, Row> rowMap, ColumnParent parent,
+      KeySlice keySlice) {
+    Row row = rowMap.get(keySlice.key);
+    if (row == null) {
+      row = new Row(keySlice.key);
+      rowMap.put(row.getKey(), row);
+    }
+    for (ColumnOrSuperColumn csc : keySlice.columns) {
+      row.addColumnOrSuperColumn(parent.column_family, parent.super_column, csc);
+    }
+  }
+
+  @Override
+  public void mutate(String key, Mutate mutation) throws IOException {
+    Map<String, Map<String, List<Mutation>>> rowMutations =
+      new HashMap<String, Map<String,List<Mutation>>>();
+    rowMutations.put(key, mutation.getMutationMap());
+    try {
+      client.batch_mutate(keySpace, rowMutations, consistencyLevel);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void setKeySpace(String keySpace) {
+    this.keySpace = keySpace;
+  }
+
+  @Override
+  public void setConsistencyLevel(ConsistencyLevel level) {
+    this.consistencyLevel = level;
+  }
+
+  @Override
+  public void close() {
+    transport.close();
+    synchronized (SERVICE) {
+      NUM_CLIENTS--;
+      if (NUM_CLIENTS == 0 && !SERVICE.isShutdown()) {
+        SERVICE.shutdown();
+      }
+    }
+  }
+
+  @Override
+  public List<TokenRange> describeRing() throws IOException {
+    try {
+      return client.describe_ring(keySpace);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public List<String> describeSplits(String startToken, String endToken,
+      int size) throws IOException {
+    try {
+      return client.describe_splits(startToken, endToken, size);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+}

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SliceGet.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SliceGet.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SliceGet.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SliceGet.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,39 @@
+package org.gora.cassandra.client;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.SlicePredicate;
+
+public class SliceGet implements Callable<Pair<ColumnParent, List<ColumnOrSuperColumn>>> {
+
+  private Cassandra.Client client;
+  private String keySpace;
+  private String key;
+  private ColumnParent parent;
+  private SlicePredicate predicate;
+  private ConsistencyLevel consistencyLevel;
+
+  public SliceGet(Cassandra.Client client, String keySpace, String key,
+      ColumnParent parent, SlicePredicate predicate,
+      ConsistencyLevel consistencyLevel) {
+    this.client = client;
+    this.keySpace = keySpace;
+    this.key = key;
+    this.parent = parent;
+    this.predicate = predicate;
+    this.consistencyLevel = consistencyLevel;
+  }
+
+  @Override
+  public Pair<ColumnParent, List<ColumnOrSuperColumn>> call()
+  throws Exception {
+    return new Pair<ColumnParent, List<ColumnOrSuperColumn>>(parent,
+        client.get_slice(keySpace, key, parent, predicate, consistencyLevel));
+  }
+
+}

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/TokenRangeRowIterableImpl.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/TokenRangeRowIterableImpl.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/TokenRangeRowIterableImpl.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/TokenRangeRowIterableImpl.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,92 @@
+package org.gora.cassandra.client;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+
+class TokenRangeRowIterableImpl implements RowIterable {
+
+  private static final Log LOG = LogFactory.getLog(RowIterable.class);
+
+  private final SimpleCassandraClient client;
+
+  private String startToken;
+
+  private String endToken;
+
+  private final int batchCount;
+
+  private Select select;
+
+  private List<Row> rows;
+
+  private int rowIndex;
+
+  TokenRangeRowIterableImpl(SimpleCassandraClient client,
+      String startToken, String endToken, int batchCount,
+      Select select) {
+    this.client = client;
+    this.startToken = startToken;
+    this.endToken = endToken;
+    this.batchCount = batchCount;
+    this.select = select;
+    this.rows = null;
+    this.rowIndex = 0;
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private void maybeInit() throws IOException {
+    if (rows != null && rowIndex >= rows.size()) {
+      rows = null;
+    }
+
+    if (rows != null) {
+      return;
+    }
+
+    rows = client.getTokenRangeIntl(startToken, endToken, batchCount, select);
+
+    if (rows.isEmpty()) {
+      rows = null;
+      return;
+    }
+
+    rowIndex = 0;
+    Row lastRow = rows.get(rows.size() - 1);
+    IPartitioner p = DatabaseDescriptor.getPartitioner();
+    startToken = p.getTokenFactory().toString(p.getToken(lastRow.getKey()));
+  }
+
+  @Override
+  public Iterator<Row> iterator() {
+    return new Iterator<Row>() {
+      @Override
+      public boolean hasNext() {
+        try {
+          maybeInit();
+        } catch (IOException e) {
+          LOG.warn(StringUtils.stringifyException(e));
+          return false;
+        }
+        return rows != null;
+      }
+
+      @Override
+      public Row next() {
+        return rows.get(rowIndex++);
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+}

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraPartitionQuery.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraPartitionQuery.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraPartitionQuery.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraPartitionQuery.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,76 @@
+package org.gora.cassandra.query;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.gora.persistency.Persistent;
+import org.gora.query.Query;
+import org.gora.query.impl.PartitionQueryImpl;
+
+public class CassandraPartitionQuery<K, T extends Persistent>
+extends PartitionQueryImpl<K, T> {
+
+  private String startToken;
+
+  private String endToken;
+
+  private String[] endPoints;
+
+  private int splitSize;
+
+  public CassandraPartitionQuery() {
+    this.dataStore = null;
+  }
+
+  public CassandraPartitionQuery(Query<K, T> baseQuery, String startToken, String endToken, String[] endPoints,
+      int splitSize) {
+    super(baseQuery);
+    this.startToken = startToken;
+    this.endToken = endToken;
+    this.endPoints = endPoints;
+    this.splitSize = splitSize;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    Text.writeString(out, startToken);
+    Text.writeString(out, endToken);
+    out.writeInt(endPoints.length);
+    for (String endPoint : endPoints) {
+      Text.writeString(out, endPoint);
+    }
+    out.writeInt(splitSize);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    startToken = Text.readString(in);
+    endToken = Text.readString(in);
+    int size = in.readInt();
+    endPoints = new String[size];
+    for (int i = 0; i < size; i++) {
+      endPoints[i] = Text.readString(in);
+    }
+    splitSize = in.readInt();
+  }
+
+  public String getStartToken() {
+    return startToken;
+  }
+
+  public String getEndToken() {
+    return endToken;
+  }
+
+  public String[] getEndPoints() {
+    return endPoints;
+  }
+
+  public int getSplitSize() {
+    return splitSize;
+  }
+}

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraQuery.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraQuery.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraQuery.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraQuery.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,17 @@
+package org.gora.cassandra.query;
+
+import org.gora.persistency.Persistent;
+import org.gora.query.impl.QueryBase;
+import org.gora.store.DataStore;
+
+public class CassandraQuery<K, T extends Persistent>
+extends QueryBase<K, T> {
+
+  public CassandraQuery() {
+    super(null);
+  }
+
+  public CassandraQuery(DataStore<K, T> dataStore) {
+    super(dataStore);
+  }
+}

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraResult.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraResult.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraResult.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraResult.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,140 @@
+package org.gora.cassandra.query;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Iterator;
+
+import org.gora.cassandra.client.CassandraClient;
+import org.gora.cassandra.client.Row;
+import org.gora.cassandra.client.Select;
+import org.gora.cassandra.store.CassandraStore;
+import org.gora.persistency.Persistent;
+import org.gora.query.Query;
+import org.gora.query.impl.ResultBase;
+import org.gora.store.DataStore;
+
+public class CassandraResult<K, T extends Persistent>
+extends ResultBase<K, T> {
+
+  private Iterator<Row> rowIter;
+
+  private CassandraStore<K, T> store;
+
+  private String[] fields;
+
+  public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query,
+      int batchRowCount) throws IOException {
+    super(dataStore, query);
+
+    store = (CassandraStore<K, T>) dataStore;
+    fields = query.getFields();
+
+    boolean isUsingTokens = (query instanceof CassandraPartitionQuery);
+    String startTokenOrKey;
+    String endTokenOrKey;
+
+    if (isUsingTokens) {
+      CassandraPartitionQuery<K, T> partitionQuery = (CassandraPartitionQuery<K, T>) query;
+      startTokenOrKey = partitionQuery.getStartToken();
+      endTokenOrKey = partitionQuery.getEndToken();
+    } else {
+      CassandraQuery<K, T> cassandraQuery = (CassandraQuery<K, T>) query;
+      startTokenOrKey = cassandraQuery.getStartKey().toString();
+      endTokenOrKey = cassandraQuery.getEndKey().toString();
+    }
+
+    Select select = store.createSelect(fields);
+
+    CassandraClient client = store.getClientByLocation(getLocation(query));
+    if (isUsingTokens) {
+      rowIter =
+        client.getTokenRange(startTokenOrKey, endTokenOrKey,
+            batchRowCount, select).iterator();
+    } else {
+      rowIter = client.getRange(startTokenOrKey, endTokenOrKey,
+          batchRowCount, select).iterator();
+    }
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return 0;
+  }
+
+  @Override
+  protected boolean nextInner() throws IOException {
+    if (!rowIter.hasNext()) {
+      return false;
+    }
+    Row row = rowIter.next();
+    if (row == null) {
+      return false;
+    }
+
+    key = toKey(row.getKey());
+    persistent = store.newInstance(row, fields);
+
+    return true;
+  }
+
+  @SuppressWarnings("unchecked")
+  private K toKey(String keyStr) {
+    Class<K> keyClass = dataStore.getKeyClass();
+    if (keyClass.isAssignableFrom(String.class)) {
+      return (K) keyStr;
+    }
+    if (keyClass.isAssignableFrom(Integer.class)) {
+      return (K) (Integer) Integer.parseInt(keyStr);
+    }
+    if (keyClass.isAssignableFrom(Float.class)) {
+      return (K) (Float) Float.parseFloat(keyStr);
+    }
+    if (keyClass.isAssignableFrom(Double.class)) {
+      return (K) (Double) Double.parseDouble(keyStr);
+    }
+    if (keyClass.isAssignableFrom(Long.class)) {
+      return (K) (Long) Long.parseLong(keyStr);
+    }
+    if (keyClass.isAssignableFrom(Short.class)) {
+      return (K) (Short) Short.parseShort(keyStr);
+    }
+    if (keyClass.isAssignableFrom(Byte.class)) {
+      return (K) (Byte) Byte.parseByte(keyStr);
+    }
+
+    throw new RuntimeException("Can't parse " + keyStr +
+                               " as an instance of " + keyClass);
+  }
+
+  @Override
+  public void close() throws IOException { }
+
+  private String getLocation(Query<K, T> query) {
+    if (!(query instanceof CassandraPartitionQuery)) {
+      return null;
+    }
+    CassandraPartitionQuery<K, T> partitonQuery =
+      (CassandraPartitionQuery<K, T>) query;
+    InetAddress[] localAddresses = new InetAddress[0];
+    try {
+      localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
+    } catch (UnknownHostException e) {
+      throw new AssertionError(e);
+    }
+    for (InetAddress address : localAddresses) {
+      for (String location : partitonQuery.getEndPoints()) {
+        InetAddress locationAddress = null;
+        try {
+          locationAddress = InetAddress.getByName(location);
+        } catch (UnknownHostException e) {
+          throw new AssertionError(e);
+        }
+        if (address.equals(locationAddress)) {
+          return location;
+        }
+      }
+    }
+    return partitonQuery.getEndPoints()[0];
+  }
+}
\ No newline at end of file

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraColumn.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraColumn.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraColumn.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraColumn.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,55 @@
+package org.gora.cassandra.store;
+
+class CassandraColumn {
+  String family;
+  String superColumn;
+  String column;
+
+  public CassandraColumn(String family, String superColumn, String column) {
+    this.family = family;
+    this.superColumn = superColumn;
+    this.column = column;
+  }
+
+  public boolean isSuperColumn() {
+    return superColumn != null;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((column == null) ? 0 : column.hashCode());
+    result = prime * result + ((family == null) ? 0 : family.hashCode());
+    result = prime * result
+        + ((superColumn == null) ? 0 : superColumn.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    CassandraColumn other = (CassandraColumn) obj;
+    if (column == null) {
+      if (other.column != null)
+        return false;
+    } else if (!column.equals(other.column))
+      return false;
+    if (family == null) {
+      if (other.family != null)
+        return false;
+    } else if (!family.equals(other.family))
+      return false;
+    if (superColumn == null) {
+      if (other.superColumn != null)
+        return false;
+    } else if (!superColumn.equals(other.superColumn))
+      return false;
+    return true;
+  }
+}

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraMapping.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraMapping.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraMapping.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraMapping.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,33 @@
+package org.gora.cassandra.store;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class CassandraMapping {
+
+  private String keySpace;
+
+  private Map<String, Boolean> families =
+    new HashMap<String, Boolean>();
+
+  public String getKeySpace() {
+    return keySpace;
+  }
+
+  public void setKeySpace(String keySpace) {
+    this.keySpace = keySpace;
+  }
+
+  public Set<String> getColumnFamilies() {
+    return families.keySet();
+  }
+
+  public void addColumnFamily(String columnFamily, boolean isSuper) {
+    families.put(columnFamily, isSuper);
+  }
+
+  public boolean isColumnFamilySuper(String columnFamily) {
+    return families.get(columnFamily);
+  }
+}

Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraStore.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraStore.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraStore.java (added)
+++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraStore.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,448 @@
+package org.gora.cassandra.store;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.util.Utf8;
+import org.apache.cassandra.thrift.TokenRange;
+import org.gora.cassandra.client.CassandraClient;
+import org.gora.cassandra.client.Mutate;
+import org.gora.cassandra.client.Row;
+import org.gora.cassandra.client.Select;
+import org.gora.cassandra.client.SimpleCassandraClient;
+import org.gora.cassandra.query.CassandraPartitionQuery;
+import org.gora.cassandra.query.CassandraQuery;
+import org.gora.cassandra.query.CassandraResult;
+import org.gora.persistency.ListGenericArray;
+import org.gora.persistency.Persistent;
+import org.gora.persistency.State;
+import org.gora.persistency.StateManager;
+import org.gora.persistency.StatefulHashMap;
+import org.gora.persistency.StatefulMap;
+import org.gora.query.PartitionQuery;
+import org.gora.query.Query;
+import org.gora.query.Result;
+import org.gora.store.DataStoreFactory;
+import org.gora.store.impl.DataStoreBase;
+import org.gora.util.ByteUtils;
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.input.SAXBuilder;
+
+/**
+ * DataStore for Cassandra.
+ *
+ * <p> Note: CassandraStore is not thread-safe. </p>
+ */
+public class CassandraStore<K, T extends Persistent>
+extends DataStoreBase<K, T> {
+
+  private static final String ERROR_MESSAGE =
+    "Cassandra does not support creating or modifying ColumnFamilies during runtime";
+
+  private static final String DEFAULT_MAPPING_FILE = "gora-cassandra-mapping.xml";
+
+  private static final int SPLIT_SIZE = 65536;
+
+  private static final int BATCH_COUNT = 256;
+
+  private CassandraClient client;
+
+  private Map<String, CassandraColumn> columnMap;
+
+  private CassandraMapping mapping;
+
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass,
+      Properties properties) throws IOException {
+    super.initialize(keyClass, persistentClass, properties);
+
+    String mappingFile =
+      DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
+
+    readMapping(mappingFile);
+  }
+
+  @Override
+  public String getSchemaName() {
+    return mapping.getKeySpace();
+  }
+
+  @Override
+  public void createSchema() throws IOException {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
+  public void deleteSchema() throws IOException {
+    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  }
+
+  @Override
+  public boolean schemaExists() throws IOException {
+    return true;
+  }
+
+  public CassandraClient getClientByLocation(String endPoint) {
+    return client;
+  }
+
+  public Select createSelect(String[] fields) {
+    Select select = new Select();
+    if (fields == null) {
+      fields = beanFactory.getCachedPersistent().getFields();
+    }
+    for (String f : fields) {
+      CassandraColumn col = columnMap.get(f);
+      Schema fieldSchema = fieldMap.get(f).schema();
+      switch (fieldSchema.getType()) {
+        case MAP:
+        case ARRAY:
+          if (col.isSuperColumn()) {
+            select.addAllColumnsForSuperColumn(col.family, col.superColumn);
+          } else {
+            select.addColumnAll(col.family);
+          }
+          break;
+        default:
+          if (col.isSuperColumn()) {
+            select.addColumnName(col.family, col.superColumn, col.column);
+          } else {
+            select.addColumnName(col.family, col.column);
+          }
+          break;
+      }
+    }
+    return select;
+  }
+
+  @Override
+  public T get(K key, String[] fields) throws IOException {
+    if (fields == null) {
+      fields = beanFactory.getCachedPersistent().getFields();
+    }
+    Select select = createSelect(fields);
+    try {
+      Row result = client.get(key.toString(), select);
+      return newInstance(result, fields);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  private void setField(T persistent, Field field, StatefulMap map) {
+    persistent.put(field.pos(), map);
+  }
+
+  private void setField(T persistent, Field field, byte[] val)
+  throws IOException {
+    persistent.put(field.pos()
+        , ByteUtils.fromBytes(val, field.schema(), datumReader, persistent.get(field.pos())));
+  }
+
+  @SuppressWarnings("rawtypes")
+  private void setField(T persistent, Field field, GenericArray list) {
+    persistent.put(field.pos(), list);
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public T newInstance(Row result, String[] fields)
+  throws IOException {
+    if(result == null)
+      return null;
+
+    T persistent = newPersistent();
+    StateManager stateManager = persistent.getStateManager();
+    for (String f : fields) {
+      CassandraColumn col = columnMap.get(f);
+      Field field = fieldMap.get(f);
+      Schema fieldSchema = field.schema();
+      Map<String, byte[]> qualMap;
+      switch(fieldSchema.getType()) {
+        case MAP:
+          if (col.isSuperColumn()) {
+            qualMap = result.getSuperColumn(col.family, col.superColumn);
+          } else {
+            qualMap = result.getColumn(col.family);
+          }
+          if (qualMap == null) {
+            continue;
+          }
+          Schema valueSchema = fieldSchema.getValueType();
+          StatefulMap map = new StatefulHashMap();
+          for (Entry<String, byte[]> e : qualMap.entrySet()) {
+            Utf8 mapKey = new Utf8(e.getKey());
+            map.put(mapKey, ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
+            map.putState(mapKey, State.CLEAN);
+          }
+          setField(persistent, field, map);
+          break;
+        case ARRAY:
+          if (col.isSuperColumn()) {
+            qualMap = result.getSuperColumn(col.family, col.superColumn);
+          } else {
+            qualMap = result.getColumn(col.family);
+          }
+          if (qualMap == null) {
+            continue;
+          }
+          valueSchema = fieldSchema.getElementType();
+          ArrayList arrayList = new ArrayList();
+          for (Entry<String, byte[]> e : qualMap.entrySet()) {
+            arrayList.add(ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
+          }
+          ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList);
+          setField(persistent, field, arr);
+          break;
+        default:
+          byte[] val;
+          if (col.isSuperColumn()) {
+            val = result.get(col.family, col.superColumn, col.column);
+          } else {
+            val = result.get(col.family, col.column);
+          }
+          if (val == null) {
+            continue;
+          }
+          setField(persistent, field, val);
+          break;
+      }
+    }
+    stateManager.clearDirty(persistent);
+    return persistent;
+  }
+
+  @Override
+  public void put(K key, T obj) throws IOException {
+    Mutate mutate = new Mutate();
+    Schema schema = obj.getSchema();
+    StateManager stateManager = obj.getStateManager();
+    List<Field> fields = schema.getFields();
+    String qual;
+    byte[] value;
+    for (int i = 0; i < fields.size(); i++) {
+      if (!stateManager.isDirty(obj, i)) {
+        continue;
+      }
+      Field field = fields.get(i);
+      Type type = field.schema().getType();
+      Object o = obj.get(i);
+      CassandraColumn col = columnMap.get(field.name());
+
+      switch(type) {
+      case MAP:
+        if(o instanceof StatefulMap) {
+          @SuppressWarnings("unchecked")
+          StatefulMap<Utf8, ?> map = (StatefulMap<Utf8, ?>) o;
+          for (Entry<Utf8, State> e : map.states().entrySet()) {
+            Utf8 mapKey = e.getKey();
+            switch (e.getValue()) {
+            case DIRTY:
+              qual = mapKey.toString();
+              value = ByteUtils.toBytes(map.get(mapKey), field.schema().getValueType(), datumWriter);
+              if (col.isSuperColumn()) {
+                mutate.put(col.family, col.superColumn, qual, value);
+              } else {
+                mutate.put(col.family, qual, value);
+              }
+              break;
+            case DELETED:
+              qual = mapKey.toString();
+              if (col.isSuperColumn()) {
+                mutate.delete(col.family, col.superColumn, qual);
+              } else {
+                mutate.delete(col.family, qual);
+              }
+              break;
+            }
+          }
+        } else {
+          @SuppressWarnings({ "rawtypes", "unchecked" })
+          Set<Map.Entry> set = ((Map)o).entrySet();
+          for(@SuppressWarnings("rawtypes") Entry entry: set) {
+            qual = entry.getKey().toString();
+            value = ByteUtils.toBytes(entry.getValue().toString());
+            if (col.isSuperColumn()) {
+              mutate.put(col.family, col.superColumn, qual, value);
+            } else {
+              mutate.put(col.family, qual, value);
+            }
+          }
+        }
+        break;
+      case ARRAY:
+        if(o instanceof GenericArray) {
+          @SuppressWarnings("rawtypes")
+          GenericArray arr = (GenericArray) o;
+          int j=0;
+          for(Object item : arr) {
+            value = ByteUtils.toBytes(item.toString());
+            if (col.isSuperColumn()) {
+              mutate.put(col.family, col.superColumn, Integer.toString(j), value);
+            } else {
+              mutate.put(col.family, Integer.toString(j), value);
+            }
+            j++;
+          }
+        }
+        break;
+      default:
+        value = ByteUtils.toBytes(o, field.schema(), datumWriter);
+        if (col.isSuperColumn()) {
+          mutate.put(col.family, col.superColumn, col.column, value);
+        } else {
+          mutate.put(col.family, col.column, value);
+        }
+        break;
+      }
+    }
+
+    if(!mutate.isEmpty())
+      client.mutate(key.toString(), mutate);
+  }
+
+  @Override
+  public boolean delete(K key) throws IOException {
+    Mutate mutate = new Mutate();
+    for (String family : mapping.getColumnFamilies()) {
+      mutate.deleteAll(family);
+    }
+
+    client.mutate(key.toString(), mutate);
+    return true;
+  }
+
+  @Override
+  public void flush() throws IOException { }
+
+  @Override
+  public void close() throws IOException {
+    client.close();
+  }
+
+  @Override
+  public Query<K, T> newQuery() {
+    return new CassandraQuery<K, T>(this);
+  }
+
+  @Override
+  public long deleteByQuery(Query<K, T> query) throws IOException {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public Result<K, T> execute(Query<K, T> query) throws IOException {
+    return new CassandraResult<K, T>(this, query, BATCH_COUNT);
+  }
+
+  @Override
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
+  throws IOException {
+    List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
+
+    List<TokenRange> rangeList = client.describeRing();
+    for (TokenRange range : rangeList) {
+      List<String> tokens =
+        client.describeSplits(range.start_token, range.end_token, SPLIT_SIZE);
+      // turn the sub-ranges into InputSplits
+      String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
+      // hadoop needs hostname, not ip
+      for (int i = 0; i < endpoints.length; i++) {
+          endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
+      }
+
+      for (int i = 1; i < tokens.size(); i++) {
+        CassandraPartitionQuery<K, T> partitionQuery =
+          new CassandraPartitionQuery<K, T>(query, tokens.get(i - 1), tokens.get(i), endpoints, SPLIT_SIZE);
+        partitions.add(partitionQuery);
+      }
+    }
+    return partitions;
+  }
+
+  private CassandraClient createClient() throws IOException {
+    String serverStr =
+      DataStoreFactory.findPropertyOrDie(properties, this, "servers");
+    String[] server1Parts = serverStr.split(",")[0].split(":");
+    try {
+      return new SimpleCassandraClient(server1Parts[0],
+          Integer.parseInt(server1Parts[1]), mapping.getKeySpace());
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  protected void readMapping(String filename) throws IOException {
+
+    mapping = new CassandraMapping();
+    columnMap = new HashMap<String, CassandraColumn>();
+
+    try {
+      SAXBuilder builder = new SAXBuilder();
+      Document doc = builder.build(getClass().getClassLoader()
+          .getResourceAsStream(filename));
+
+      List<Element> classes = doc.getRootElement().getChildren("class");
+
+      for(Element classElement: classes) {
+        if(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName())
+            && classElement.getAttributeValue("name").equals(
+                persistentClass.getCanonicalName())) {
+
+          String keySpace = classElement.getAttributeValue("keyspace");
+          mapping.setKeySpace(keySpace);
+          client = createClient();
+          Map<String, Map<String, String>> keySpaceDesc = client.describeKeySpace();
+          for (Entry<String, Map<String, String>> e : keySpaceDesc.entrySet()) {
+            boolean isSuper = e.getValue().get("Type").equals("Super");
+            mapping.addColumnFamily(e.getKey(), isSuper);
+          }
+
+          List<Element> fields = classElement.getChildren("field");
+
+          for(Element field:fields) {
+            String fieldName = field.getAttributeValue("name");
+            String path = field.getAttributeValue("path");
+            String[] parts = path.split(":");
+            String columnFamily = parts[0];
+            String superColumn = null;
+            String column = null;
+
+            boolean isSuper = mapping.isColumnFamilySuper(columnFamily);
+            if (isSuper) {
+              superColumn = parts[1];
+              if (parts.length == 3) {
+                column = parts[2];
+              }
+            } else {
+              if (parts.length == 2) {
+                column = parts[1];
+              }
+            }
+
+            columnMap.put(fieldName,
+                new CassandraColumn(columnFamily, superColumn, column));
+          }
+
+          break;
+        }
+      }
+    } catch(Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+}
\ No newline at end of file

Added: incubator/gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml (added)
+++ incubator/gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml Fri Oct  8 21:17:10 2010
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<gora-orm>
+
+  <class name="org.gora.examples.generated.Employee" keyClass="java.lang.String" keyspace="Employee">
+    <field name="name" path="info:nm"/>
+    <field name="dateOfBirth" path="info:db"/>
+    <field name="ssn" path="info:sn"/>
+    <field name="salary" path="info:sl"/>
+  </class>
+
+  <class name="org.gora.examples.generated.WebPage" keyClass="java.lang.String" keyspace="WebPage">
+    <field name="url" path="c:u"/>
+    <field name="content" path="p:cnt:c"/>
+    <field name="parsedContent" path="p:parsedContent"/>
+    <field name="outlinks" path="p:outlinks"/>
+    <field name="metadata" path="c:mt"/>
+  </class>
+
+  <class name="org.gora.examples.generated.TokenDatum" keyClass="java.lang.String" keyspace="TokenDatum">
+    <field name="count" path="common:count"/>
+  </class>
+
+</gora-orm>

Added: incubator/gora/trunk/gora-cassandra/src/test/conf/gora.properties
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/test/conf/gora.properties?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-cassandra/src/test/conf/gora.properties (added)
+++ incubator/gora/trunk/gora-cassandra/src/test/conf/gora.properties Fri Oct  8 21:17:10 2010
@@ -0,0 +1 @@
+gora.cassandrastore.servers=localhost:9160

Added: incubator/gora/trunk/gora-cassandra/src/test/java/.gitignore
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/test/java/.gitignore?rev=1006024&view=auto
==============================================================================
    (empty)

Added: incubator/gora/trunk/gora-core/build.xml
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/build.xml?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/build.xml (added)
+++ incubator/gora/trunk/gora-core/build.xml Fri Oct  8 21:17:10 2010
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project name="gora-core" default="compile">
+  <property name="project.dir" value="${basedir}/.."/>
+
+  <import file="${project.dir}/build-common.xml"/>
+</project>

Added: incubator/gora/trunk/gora-core/conf/.gitignore
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/conf/.gitignore?rev=1006024&view=auto
==============================================================================
    (empty)

Added: incubator/gora/trunk/gora-core/ivy/ivy.xml
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/ivy/ivy.xml?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/ivy/ivy.xml (added)
+++ incubator/gora/trunk/gora-core/ivy/ivy.xml Fri Oct  8 21:17:10 2010
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+
+<ivy-module version="2.0">
+    <info 
+      organisation="org.gora"
+      module="gora-core"
+      status="integration"/>
+
+  <configurations>
+    <include file="${project.dir}/ivy/ivy-configurations.xml"/>
+  </configurations>
+
+  <publications defaultconf="compile">
+    <artifact name="gora-core" conf="compile"/>
+    <artifact name="gora-core-test" conf="test"/>
+  </publications>
+
+  <dependencies>
+
+    <dependency org="commons-logging" name="commons-logging" rev="1.1.1" conf="*->default"/>
+    <dependency org="log4j" name="log4j" rev="1.2.15" conf="*->master"/>
+    <dependency org="commons-lang" name="commons-lang" rev="2.5"/>
+    
+    <dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2" conf="*->default">
+      <exclude org="hsqldb" name="hsqldb"/>
+      <exclude org="net.sf.kosmosfs" name="kfs"/>
+      <exclude org="net.java.dev.jets3t" name="jets3t"/>
+      <exclude org="org.eclipse.jdt" name="core"/>
+      <exclude org="org.mortbay.jetty" name="jsp-*"/>
+    </dependency>
+    <dependency org="org.apache.hadoop" name="avro" rev="1.3.2" conf="*->default">
+      <exclude org="ant" name="ant"/>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency org="org.apache.hadoop" name="hadoop-test" rev="0.20.2" conf="test->master"/>
+    <dependency org="junit" name="junit" rev="4.6" conf="test->default"/>
+
+  </dependencies>
+</ivy-module>
+

Added: incubator/gora/trunk/gora-core/lib-ext/.gitignore
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/lib-ext/.gitignore?rev=1006024&view=auto
==============================================================================
    (empty)

Added: incubator/gora/trunk/gora-core/src/examples/avro/employee.json
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/avro/employee.json?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/examples/avro/employee.json (added)
+++ incubator/gora/trunk/gora-core/src/examples/avro/employee.json Fri Oct  8 21:17:10 2010
@@ -0,0 +1,11 @@
+  {
+    "type": "record",
+    "name": "Employee",
+    "namespace": "org.gora.examples.generated",
+    "fields" : [
+      {"name": "name", "type": "string"},
+      {"name": "dateOfBirth", "type": "long"},
+      {"name": "ssn", "type": "string"},
+      {"name": "salary", "type": "int"}
+    ]
+  }

Added: incubator/gora/trunk/gora-core/src/examples/avro/tokendatum.json
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/avro/tokendatum.json?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/examples/avro/tokendatum.json (added)
+++ incubator/gora/trunk/gora-core/src/examples/avro/tokendatum.json Fri Oct  8 21:17:10 2010
@@ -0,0 +1,8 @@
+{
+  "type": "record",
+  "name": "TokenDatum",
+  "namespace": "org.gora.examples.generated",
+  "fields" : [
+    {"name": "count", "type": "int"}
+  ]
+}

Added: incubator/gora/trunk/gora-core/src/examples/avro/webpage.json
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/avro/webpage.json?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/examples/avro/webpage.json (added)
+++ incubator/gora/trunk/gora-core/src/examples/avro/webpage.json Fri Oct  8 21:17:10 2010
@@ -0,0 +1,20 @@
+{
+  "type": "record",
+  "name": "WebPage",
+  "namespace": "org.gora.examples.generated",
+  "fields" : [
+    {"name": "url", "type": "string"},
+    {"name": "content", "type": "bytes"},
+    {"name": "parsedContent", "type": {"type":"array", "items": "string"}},
+    {"name": "outlinks", "type": {"type":"map", "values":"string"}},
+    {"name": "metadata", "type": {
+      "name": "Metadata",
+      "type": "record",
+      "namespace": "org.gora.examples.generated",
+      "fields": [
+        {"name": "version", "type": "int"},
+        {"name": "data", "type": {"type": "map", "values": "string"}}
+      ]
+    }}
+  ]
+}

Added: incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/WebPageDataCreator.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/WebPageDataCreator.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/WebPageDataCreator.java (added)
+++ incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/WebPageDataCreator.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,136 @@
+
+package org.gora.examples;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.gora.examples.generated.Metadata;
+import org.gora.examples.generated.WebPage;
+import org.gora.store.DataStore;
+import org.gora.store.DataStoreFactory;
+
+/**
+ * Creates and stores some data to be used in the tests.
+ */
+public class WebPageDataCreator {
+
+  private static final Log log = LogFactory.getLog(WebPageDataCreator.class);
+  
+  public static final String[] URLS = {
+    "http://foo.com/",
+    "http://foo.com/1.html",
+    "http://foo.com/2.html",
+    "http://bar.com/3.jsp",
+    "http://bar.com/1.html",
+    "http://bar.com/",
+    "http://baz.com/1.jsp&q=barbaz",
+    "http://baz.com/1.jsp&q=barbaz&p=foo",
+    "http://baz.com/1.jsp&q=foo",
+    "http://bazbar.com",
+  };
+  
+  public static HashMap<String, Integer> URL_INDEXES = new HashMap<String, Integer>();
+  
+  static {
+    for(int i=0; i<URLS.length; i++) {
+      URL_INDEXES.put(URLS[i], i);
+    }  
+  }
+  
+  public static final String[] CONTENTS = {
+    "foo baz bar",
+    "foo",
+    "foo1 bar1 baz1",
+    "a b c d e",
+    "aa bb cc dd ee",
+    "1",
+    "2 3",
+    "a b b b b b a",
+    "a a a",
+    "foo bar baz",
+  };
+  
+  public static final int[][] LINKS = {
+    {1, 2, 3, 9},
+    {3, 9},
+    {},
+    {9},
+    {5},
+    {1, 2, 3, 4, 6, 7, 8, 9},
+    {1},
+    {2},
+    {3},
+    {8, 1},
+  };
+
+  public static final String[][] ANCHORS = {
+    {"foo", "foo", "foo", "foo"},
+    {"a1", "a2"},
+    {},
+    {"anchor1"},
+    {"bar"},
+    {"a1", "a2", "a3", "a4","a5", "a6", "a7", "a8", "a9"},
+    {"foo"},
+    {"baz"},
+    {"bazbar"},
+    {"baz", "bar"},
+  };
+
+  public static final String[] SORTED_URLS = new String[URLS.length];
+  static {
+    for (int i = 0; i < URLS.length; i++) {
+      SORTED_URLS[i] = URLS[i];
+    }
+    Arrays.sort(SORTED_URLS);
+  }
+  
+  public static void createWebPageData(DataStore<String, WebPage> dataStore) 
+  throws IOException {
+    WebPage page;
+    log.info("creating web page data");
+    
+    for(int i=0; i<URLS.length; i++) {
+      page = new WebPage();
+      page.setUrl(new Utf8(URLS[i]));
+      page.setContent(ByteBuffer.wrap(CONTENTS[i].getBytes()));
+      for(String token : CONTENTS[i].split(" ")) {
+        page.addToParsedContent(new Utf8(token));  
+      }
+      
+      for(int j=0; j<LINKS[i].length; j++) {
+        page.putToOutlinks(new Utf8(URLS[LINKS[i][j]]), new Utf8(ANCHORS[i][j]));
+      }
+      
+      Metadata metadata = new Metadata();
+      metadata.setVersion(1);
+      metadata.putToData(new Utf8("metakey"), new Utf8("metavalue"));
+      page.setMetadata(metadata);
+      
+      dataStore.put(URLS[i], page);
+    }
+    dataStore.flush();
+    log.info("finished creating web page data");
+  }
+  
+  public int run(String[] args) throws Exception {
+    String dataStoreClass = "org.gora.hbase.store.HBaseStore";
+    if(args.length > 0) {
+      dataStoreClass = args[0];
+    }
+    
+    DataStore<String,WebPage> store 
+      = DataStoreFactory.getDataStore(dataStoreClass, String.class, WebPage.class);
+    createWebPageData(store);
+    
+    return 0;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    new WebPageDataCreator().run(args);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Employee.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Employee.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Employee.java (added)
+++ incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Employee.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,102 @@
+package org.gora.examples.generated;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Protocol;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificExceptionBase;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificFixed;
+import org.gora.persistency.StateManager;
+import org.gora.persistency.impl.PersistentBase;
+import org.gora.persistency.impl.StateManagerImpl;
+import org.gora.persistency.StatefulHashMap;
+import org.gora.persistency.ListGenericArray;
+
+@SuppressWarnings("all")
+public class Employee extends PersistentBase {
+  public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"org.gora.examples.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"dateOfBirth\",\"type\":\"long\"},{\"name\":\"ssn\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"int\"}]}");
+  public static enum Field {
+    NAME(0,"name"),
+    DATE_OF_BIRTH(1,"dateOfBirth"),
+    SSN(2,"ssn"),
+    SALARY(3,"salary"),
+    ;
+    private int index;
+    private String name;
+    Field(int index, String name) {this.index=index;this.name=name;}
+    public int getIndex() {return index;}
+    public String getName() {return name;}
+    public String toString() {return name;}
+  };
+  public static final String[] _ALL_FIELDS = {"name","dateOfBirth","ssn","salary",};
+  static {
+    PersistentBase.registerFields(Employee.class, _ALL_FIELDS);
+  }
+  private Utf8 name;
+  private long dateOfBirth;
+  private Utf8 ssn;
+  private int salary;
+  public Employee() {
+    this(new StateManagerImpl());
+  }
+  public Employee(StateManager stateManager) {
+    super(stateManager);
+  }
+  public Employee newInstance(StateManager stateManager) {
+    return new Employee(stateManager);
+  }
+  public Schema getSchema() { return _SCHEMA; }
+  public Object get(int _field) {
+    switch (_field) {
+    case 0: return name;
+    case 1: return dateOfBirth;
+    case 2: return ssn;
+    case 3: return salary;
+    default: throw new AvroRuntimeException("Bad index");
+    }
+  }
+  @SuppressWarnings(value="unchecked")
+  public void put(int _field, Object _value) {
+    if(isFieldEqual(_field, _value)) return;
+    getStateManager().setDirty(this, _field);
+    switch (_field) {
+    case 0:name = (Utf8)_value; break;
+    case 1:dateOfBirth = (Long)_value; break;
+    case 2:ssn = (Utf8)_value; break;
+    case 3:salary = (Integer)_value; break;
+    default: throw new AvroRuntimeException("Bad index");
+    }
+  }
+  public Utf8 getName() {
+    return (Utf8) get(0);
+  }
+  public void setName(Utf8 value) {
+    put(0, value);
+  }
+  public long getDateOfBirth() {
+    return (Long) get(1);
+  }
+  public void setDateOfBirth(long value) {
+    put(1, value);
+  }
+  public Utf8 getSsn() {
+    return (Utf8) get(2);
+  }
+  public void setSsn(Utf8 value) {
+    put(2, value);
+  }
+  public int getSalary() {
+    return (Integer) get(3);
+  }
+  public void setSalary(int value) {
+    put(3, value);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Metadata.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Metadata.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Metadata.java (added)
+++ incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Metadata.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,93 @@
+package org.gora.examples.generated;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Protocol;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificExceptionBase;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificFixed;
+import org.gora.persistency.StateManager;
+import org.gora.persistency.impl.PersistentBase;
+import org.gora.persistency.impl.StateManagerImpl;
+import org.gora.persistency.StatefulHashMap;
+import org.gora.persistency.ListGenericArray;
+
+@SuppressWarnings("all")
+public class Metadata extends PersistentBase {
+  public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"Metadata\",\"namespace\":\"org.gora.examples.generated\",\"fields\":[{\"name\":\"version\",\"type\":\"int\"},{\"name\":\"data\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}");
+  public static enum Field {
+    VERSION(0,"version"),
+    DATA(1,"data"),
+    ;
+    private int index;
+    private String name;
+    Field(int index, String name) {this.index=index;this.name=name;}
+    public int getIndex() {return index;}
+    public String getName() {return name;}
+    public String toString() {return name;}
+  };
+  public static final String[] _ALL_FIELDS = {"version","data",};
+  static {
+    PersistentBase.registerFields(Metadata.class, _ALL_FIELDS);
+  }
+  private int version;
+  private Map<Utf8,Utf8> data;
+  public Metadata() {
+    this(new StateManagerImpl());
+  }
+  public Metadata(StateManager stateManager) {
+    super(stateManager);
+    data = new StatefulHashMap<Utf8,Utf8>();
+  }
+  public Metadata newInstance(StateManager stateManager) {
+    return new Metadata(stateManager);
+  }
+  public Schema getSchema() { return _SCHEMA; }
+  public Object get(int _field) {
+    switch (_field) {
+    case 0: return version;
+    case 1: return data;
+    default: throw new AvroRuntimeException("Bad index");
+    }
+  }
+  @SuppressWarnings(value="unchecked")
+  public void put(int _field, Object _value) {
+    if(isFieldEqual(_field, _value)) return;
+    getStateManager().setDirty(this, _field);
+    switch (_field) {
+    case 0:version = (Integer)_value; break;
+    case 1:data = (Map<Utf8,Utf8>)_value; break;
+    default: throw new AvroRuntimeException("Bad index");
+    }
+  }
+  public int getVersion() {
+    return (Integer) get(0);
+  }
+  public void setVersion(int value) {
+    put(0, value);
+  }
+  public Map<Utf8, Utf8> getData() {
+    return (Map<Utf8, Utf8>) get(1);
+  }
+  public Utf8 getFromData(Utf8 key) {
+    if (data == null) { return null; }
+    return data.get(key);
+  }
+  public void putToData(Utf8 key, Utf8 value) {
+    getStateManager().setDirty(this, 1);
+    data.put(key, value);
+  }
+  public Utf8 removeFromData(Utf8 key) {
+    if (data == null) { return null; }
+    getStateManager().setDirty(this, 1);
+    return data.remove(key);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/TokenDatum.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/TokenDatum.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/TokenDatum.java (added)
+++ incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/TokenDatum.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,72 @@
+package org.gora.examples.generated;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Protocol;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificExceptionBase;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificFixed;
+import org.gora.persistency.StateManager;
+import org.gora.persistency.impl.PersistentBase;
+import org.gora.persistency.impl.StateManagerImpl;
+import org.gora.persistency.StatefulHashMap;
+import org.gora.persistency.ListGenericArray;
+
+@SuppressWarnings("all")
+public class TokenDatum extends PersistentBase {
+  public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"TokenDatum\",\"namespace\":\"org.gora.examples.generated\",\"fields\":[{\"name\":\"count\",\"type\":\"int\"}]}");
+  public static enum Field {
+    COUNT(0,"count"),
+    ;
+    private int index;
+    private String name;
+    Field(int index, String name) {this.index=index;this.name=name;}
+    public int getIndex() {return index;}
+    public String getName() {return name;}
+    public String toString() {return name;}
+  };
+  public static final String[] _ALL_FIELDS = {"count",};
+  static {
+    PersistentBase.registerFields(TokenDatum.class, _ALL_FIELDS);
+  }
+  private int count;
+  public TokenDatum() {
+    this(new StateManagerImpl());
+  }
+  public TokenDatum(StateManager stateManager) {
+    super(stateManager);
+  }
+  public TokenDatum newInstance(StateManager stateManager) {
+    return new TokenDatum(stateManager);
+  }
+  public Schema getSchema() { return _SCHEMA; }
+  public Object get(int _field) {
+    switch (_field) {
+    case 0: return count;
+    default: throw new AvroRuntimeException("Bad index");
+    }
+  }
+  @SuppressWarnings(value="unchecked")
+  public void put(int _field, Object _value) {
+    if(isFieldEqual(_field, _value)) return;
+    getStateManager().setDirty(this, _field);
+    switch (_field) {
+    case 0:count = (Integer)_value; break;
+    default: throw new AvroRuntimeException("Bad index");
+    }
+  }
+  public int getCount() {
+    return (Integer) get(0);
+  }
+  public void setCount(int value) {
+    put(0, value);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/WebPage.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/WebPage.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/WebPage.java (added)
+++ incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/WebPage.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,125 @@
+package org.gora.examples.generated;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Protocol;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificExceptionBase;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificFixed;
+import org.gora.persistency.StateManager;
+import org.gora.persistency.impl.PersistentBase;
+import org.gora.persistency.impl.StateManagerImpl;
+import org.gora.persistency.StatefulHashMap;
+import org.gora.persistency.ListGenericArray;
+
+@SuppressWarnings("all")
+public class WebPage extends PersistentBase {
+  public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"WebPage\",\"namespace\":\"org.gora.examples.generated\",\"fields\":[{\"name\":\"url\",\"type\":\"string\"},{\"name\":\"content\",\"type\":\"bytes\"},{\"name\":\"parsedContent\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"outlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"metadata\",\"type\":{\"type\":\"record\",\"name\":\"Metadata\",\"fields\":[{\"name\":\"version\",\"type\":\"int\"},{\"name\":\"data\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}}]}");
+  public static enum Field {
+    URL(0,"url"),
+    CONTENT(1,"content"),
+    PARSED_CONTENT(2,"parsedContent"),
+    OUTLINKS(3,"outlinks"),
+    METADATA(4,"metadata"),
+    ;
+    private int index;
+    private String name;
+    Field(int index, String name) {this.index=index;this.name=name;}
+    public int getIndex() {return index;}
+    public String getName() {return name;}
+    public String toString() {return name;}
+  };
+  public static final String[] _ALL_FIELDS = {"url","content","parsedContent","outlinks","metadata",};
+  static {
+    PersistentBase.registerFields(WebPage.class, _ALL_FIELDS);
+  }
+  private Utf8 url;
+  private ByteBuffer content;
+  private GenericArray<Utf8> parsedContent;
+  private Map<Utf8,Utf8> outlinks;
+  private Metadata metadata;
+  public WebPage() {
+    this(new StateManagerImpl());
+  }
+  public WebPage(StateManager stateManager) {
+    super(stateManager);
+    parsedContent = new ListGenericArray<Utf8>(getSchema().getField("parsedContent").schema());
+    outlinks = new StatefulHashMap<Utf8,Utf8>();
+  }
+  public WebPage newInstance(StateManager stateManager) {
+    return new WebPage(stateManager);
+  }
+  public Schema getSchema() { return _SCHEMA; }
+  public Object get(int _field) {
+    switch (_field) {
+    case 0: return url;
+    case 1: return content;
+    case 2: return parsedContent;
+    case 3: return outlinks;
+    case 4: return metadata;
+    default: throw new AvroRuntimeException("Bad index");
+    }
+  }
+  @SuppressWarnings(value="unchecked")
+  public void put(int _field, Object _value) {
+    if(isFieldEqual(_field, _value)) return;
+    getStateManager().setDirty(this, _field);
+    switch (_field) {
+    case 0:url = (Utf8)_value; break;
+    case 1:content = (ByteBuffer)_value; break;
+    case 2:parsedContent = (GenericArray<Utf8>)_value; break;
+    case 3:outlinks = (Map<Utf8,Utf8>)_value; break;
+    case 4:metadata = (Metadata)_value; break;
+    default: throw new AvroRuntimeException("Bad index");
+    }
+  }
+  public Utf8 getUrl() {
+    return (Utf8) get(0);
+  }
+  public void setUrl(Utf8 value) {
+    put(0, value);
+  }
+  public ByteBuffer getContent() {
+    return (ByteBuffer) get(1);
+  }
+  public void setContent(ByteBuffer value) {
+    put(1, value);
+  }
+  public GenericArray<Utf8> getParsedContent() {
+    return (GenericArray<Utf8>) get(2);
+  }
+  public void addToParsedContent(Utf8 element) {
+    getStateManager().setDirty(this, 2);
+    parsedContent.add(element);
+  }
+  public Map<Utf8, Utf8> getOutlinks() {
+    return (Map<Utf8, Utf8>) get(3);
+  }
+  public Utf8 getFromOutlinks(Utf8 key) {
+    if (outlinks == null) { return null; }
+    return outlinks.get(key);
+  }
+  public void putToOutlinks(Utf8 key, Utf8 value) {
+    getStateManager().setDirty(this, 3);
+    outlinks.put(key, value);
+  }
+  public Utf8 removeFromOutlinks(Utf8 key) {
+    if (outlinks == null) { return null; }
+    getStateManager().setDirty(this, 3);
+    return outlinks.remove(key);
+  }
+  public Metadata getMetadata() {
+    return (Metadata) get(4);
+  }
+  public void setMetadata(Metadata value) {
+    put(4, value);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/QueryCounter.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/QueryCounter.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/QueryCounter.java (added)
+++ incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/QueryCounter.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,133 @@
+
+package org.gora.examples.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.gora.mapreduce.GoraMapper;
+import org.gora.persistency.Persistent;
+import org.gora.query.Query;
+import org.gora.store.DataStore;
+import org.gora.store.DataStoreFactory;
+
+/**
+ * Example Hadoop job to count the row of a gora {@link Query}.
+ */
+public class QueryCounter<K, T extends Persistent> extends Configured implements Tool {
+
+  public static final String COUNTER_GROUP = "QueryCounter";
+  public static final String ROWS = "ROWS";
+
+  public QueryCounter(Configuration conf) {
+    setConf(conf);
+  }
+
+  public static class QueryCounterMapper<K, T extends Persistent>
+  extends GoraMapper<K, T
+    , NullWritable, NullWritable> {
+
+    @Override
+    protected void map(K key, T value,
+        Context context) throws IOException ,InterruptedException {
+
+      context.getCounter(COUNTER_GROUP, ROWS).increment(1L);
+    };
+  }
+
+  /** Returns the Query to count the results of. Subclasses can
+   * override this function to customize the query.
+   * @return the Query object to count the results of.
+   */
+  public Query<K, T> getQuery(DataStore<K,T> dataStore) {
+    Query<K,T> query = dataStore.newQuery();
+    return query;
+  }
+
+  /**
+   * Creates and returns the {@link Job} for submitting to Hadoop mapreduce.
+   * @param dataStore
+   * @param query
+   * @return
+   * @throws IOException
+   */
+  public Job createJob(DataStore<K,T> dataStore, Query<K,T> query) throws IOException {
+    Job job = new Job(getConf());
+
+    job.setJobName("QueryCounter");
+    job.setNumReduceTasks(0);
+    job.setJarByClass(getClass());
+    /* Mappers are initialized with GoraMapper.initMapper()*/
+    GoraMapper.initMapperJob(job, query, dataStore, NullWritable.class
+        , NullWritable.class, QueryCounterMapper.class, true);
+
+    job.setOutputFormatClass(NullOutputFormat.class);
+    return job;
+  }
+
+
+  /**
+   * Returns the number of results to the Query
+   */
+  public long countQuery(DataStore<K,T> dataStore, Query<K,T> query) throws Exception {
+    Job job = createJob(dataStore, query);
+    job.waitForCompletion(true);
+
+    return job.getCounters().findCounter(COUNTER_GROUP, ROWS).getValue();
+  }
+
+  /**
+   * Returns the number of results to the Query obtained by the
+   * {@link #getQuery(DataStore)} method.
+   */
+  public long countQuery(DataStore<K,T> dataStore) throws Exception {
+    Query<K,T> query = getQuery(dataStore);
+
+    Job job = createJob(dataStore, query);
+    job.waitForCompletion(true);
+
+    return job.getCounters().findCounter(COUNTER_GROUP, ROWS).getValue();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public int run(String[] args) throws Exception {
+
+    if(args.length < 2) {
+      System.err.println("Usage QueryCounter <keyClass> <persistentClass> [dataStoreClass]");
+      return 1;
+    }
+
+    Class<K> keyClass = (Class<K>) Class.forName(args[0]);
+    Class<T> persistentClass = (Class<T>) Class.forName(args[1]);
+
+    DataStore<K,T> dataStore;
+
+    if(args.length > 2) {
+      Class<? extends DataStore<K,T>> dataStoreClass
+          = (Class<? extends DataStore<K, T>>) Class.forName(args[2]);
+      dataStore = DataStoreFactory.getDataStore(dataStoreClass, keyClass, persistentClass);
+    }
+    else {
+      dataStore = DataStoreFactory.getDataStore(keyClass, persistentClass);
+    }
+
+    long results = countQuery(dataStore);
+
+    System.out.println("Number of result to the query:" + results);
+
+    return 0;
+  }
+
+
+  @SuppressWarnings("rawtypes")
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new QueryCounter(new Configuration()), args);
+    System.exit(ret);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/WordCount.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/WordCount.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/WordCount.java (added)
+++ incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/WordCount.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,152 @@
+
+package org.gora.examples.mapreduce;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.gora.examples.generated.TokenDatum;
+import org.gora.examples.generated.WebPage;
+import org.gora.mapreduce.GoraMapper;
+import org.gora.mapreduce.GoraReducer;
+import org.gora.query.Query;
+import org.gora.store.DataStore;
+import org.gora.store.DataStoreFactory;
+
+/**
+ * Classic word count example in Gora.
+ */
+public class WordCount extends Configured implements Tool {
+
+  public WordCount() {
+    
+  }
+  
+  public WordCount(Configuration conf) {
+    setConf(conf);
+  }
+  
+  /**
+   * TokenizerMapper takes &lt;String, WebPage&gt; pairs as obtained 
+   * from the input DataStore, and tokenizes the content via 
+   * {@link WebPage#getContent()}. The tokens are emitted as 
+   * &lt;String, WebPage&gt; pairs.
+   */
+  public static class TokenizerMapper 
+    extends GoraMapper<String, WebPage, Text, IntWritable> {
+    
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+    
+    @Override
+    protected void map(String key, WebPage page, Context context) 
+      throws IOException ,InterruptedException {
+      
+      //Get the content from a WebPage as obtained from the DataStore
+      String content =  new String(page.getContent().array());
+      
+      StringTokenizer itr = new StringTokenizer(content);
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        context.write(word, one);
+      }
+    };
+  }
+  
+  public static class WordCountReducer extends GoraReducer<Text, IntWritable, 
+  String, TokenDatum> {
+    
+    TokenDatum result = new TokenDatum();
+    
+    @Override
+    protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
+      throws IOException ,InterruptedException {
+      int sum = 0;
+      for (IntWritable val : values) {
+        sum += val.get();
+      }
+      result.setCount(sum);
+      context.write(key.toString(), result);
+    };
+    
+  }
+  
+  /**
+   * Creates and returns the {@link Job} for submitting to Hadoop mapreduce.
+   * @param inStore
+   * @param query
+   * @return
+   * @throws IOException
+   */
+  public Job createJob(DataStore<String,WebPage> inStore, Query<String,WebPage> query
+      , DataStore<String,TokenDatum> outStore) throws IOException {
+    Job job = new Job(getConf());
+   
+    job.setJobName("WordCount");
+    
+    job.setNumReduceTasks(10);
+    job.setJarByClass(getClass());
+    
+    /* Mappers are initialized with GoraMapper#initMapper().
+     * Instead of the TokenizerMapper defined here, if the input is not 
+     * obtained via Gora, any other mapper can be used, such as 
+     * Hadoop-MapReduce's WordCount.TokenizerMapper.
+     */
+    GoraMapper.initMapperJob(job, query, inStore, Text.class
+        , IntWritable.class, TokenizerMapper.class, true);
+    
+    /* Reducers are initialized with GoraReducer#initReducer().
+     * If the output is not to be persisted via Gora, any reducer 
+     * can be used instead.
+     */
+    GoraReducer.initReducerJob(job, outStore, WordCountReducer.class);
+    
+    //TODO: set combiner
+    
+    return job;
+  }
+  
+  public int wordCount(DataStore<String,WebPage> inStore, 
+      DataStore<String, TokenDatum> outStore) throws IOException, InterruptedException, ClassNotFoundException {
+    Query<String,WebPage> query = inStore.newQuery();
+    
+    Job job = createJob(inStore, query, outStore);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    
+    DataStore<String,WebPage> inStore;
+    DataStore<String, TokenDatum> outStore;
+    
+    if(args.length > 0) {
+      String dataStoreClass = args[0];
+      inStore = DataStoreFactory.getDataStore(dataStoreClass, 
+          String.class, WebPage.class);
+      if(args.length > 1) {
+        dataStoreClass = args[1];
+      }
+      outStore = DataStoreFactory.getDataStore(dataStoreClass, 
+          String.class, TokenDatum.class);
+    } else {
+      inStore = DataStoreFactory.getDataStore(String.class, WebPage.class);
+      outStore = DataStoreFactory.getDataStore(String.class, TokenDatum.class);
+    }
+    
+    return wordCount(inStore, outStore);
+  }
+  
+  // Usage WordCount [<input datastore class> [output datastore class]]
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new WordCount(), args);
+    System.exit(ret);
+  }
+  
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/avro/PersistentDatumReader.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/avro/PersistentDatumReader.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/gora/avro/PersistentDatumReader.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/gora/avro/PersistentDatumReader.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,232 @@
+
+package org.gora.avro;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.ResolvingDecoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.util.Utf8;
+import org.gora.mapreduce.FakeResolvingDecoder;
+import org.gora.persistency.ListGenericArray;
+import org.gora.persistency.Persistent;
+import org.gora.persistency.State;
+import org.gora.persistency.StatefulHashMap;
+import org.gora.persistency.StatefulMap;
+import org.gora.persistency.impl.StateManagerImpl;
+import org.gora.util.IOUtils;
+
+/**
+ * PersistentDatumReader reads, fields' dirty and readable information.
+ */
+public class PersistentDatumReader<T extends Persistent>
+  extends SpecificDatumReader<T> {
+
+  private Schema rootSchema;
+  private T cachedPersistent; // for creating objects
+
+  private WeakHashMap<Decoder, ResolvingDecoder> decoderCache
+    = new WeakHashMap<Decoder, ResolvingDecoder>();
+
+  private boolean readDirtyBits = true;
+
+  public PersistentDatumReader() {
+  }
+
+  public PersistentDatumReader(Schema schema, boolean readDirtyBits) {
+    this.readDirtyBits = readDirtyBits;
+    setSchema(schema);
+  }
+
+  @Override
+  public void setSchema(Schema actual) {
+    this.rootSchema = actual;
+    super.setSchema(actual);
+  }
+
+  @SuppressWarnings("unchecked")
+  public T newPersistent() {
+    if(cachedPersistent == null) {
+      cachedPersistent = (T)super.newRecord(null, rootSchema);
+      return cachedPersistent; //we can return the cached object
+    }
+    return (T)cachedPersistent.newInstance(new StateManagerImpl());
+  }
+
+  @Override
+  protected Object newRecord(Object old, Schema schema) {
+    if(old != null) {
+      return old;
+    }
+
+    if(schema.equals(rootSchema)) {
+      return newPersistent();
+    } else {
+      return super.newRecord(old, schema);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public T read(T reuse, Decoder in) throws IOException {
+    return (T) read(reuse, rootSchema, in);
+  }
+
+  public Object read(Object reuse, Schema schema, Decoder decoder)
+    throws IOException {
+    return super.read(reuse, schema, getResolvingDecoder(decoder));
+  }
+
+  protected ResolvingDecoder getResolvingDecoder(Decoder decoder)
+  throws IOException {
+    ResolvingDecoder resolvingDecoder = decoderCache.get(decoder);
+    if(resolvingDecoder == null) {
+      resolvingDecoder = new FakeResolvingDecoder(rootSchema, decoder);
+      decoderCache.put(decoder, resolvingDecoder);
+    }
+    return resolvingDecoder;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected Object readRecord(Object old, Schema expected, ResolvingDecoder in)
+      throws IOException {
+
+    Object record = newRecord(old, expected);
+
+    //check if top-level
+    if(expected.equals(rootSchema) && readDirtyBits) {
+      T persistent = (T)record;
+      persistent.clear();
+
+      boolean[] dirtyFields = IOUtils.readBoolArray(in);
+      boolean[] readableFields = IOUtils.readBoolArray(in);
+
+      //read fields
+      int i = 0;
+
+      for (Field f : expected.getFields()) {
+        if(readableFields[f.pos()]) {
+          int pos = f.pos();
+          String name = f.name();
+          Object oldDatum = (old != null) ? getField(record, name, pos) : null;
+          setField(record, name, pos, read(oldDatum, f.schema(), in));
+        }
+      }
+
+      // Now set changed bits
+      for (i = 0; i < dirtyFields.length; i++) {
+        if (dirtyFields[i]) {
+          persistent.setDirty(i);
+        }
+      }
+      return record;
+    } else {
+      //since ResolvingDecoder.readFieldOrder is final, we cannot override it
+      //so this is a copy of super.readReacord, with the readFieldOrder change
+
+      for (Field f : expected.getFields()) {
+        int pos = f.pos();
+        String name = f.name();
+        Object oldDatum = (old != null) ? getField(record, name, pos) : null;
+        setField(record, name, pos, read(oldDatum, f.schema(), in));
+      }
+
+      return record;
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected Object readMap(Object old, Schema expected, ResolvingDecoder in)
+      throws IOException {
+
+    StatefulMap<Utf8, ?> map = (StatefulMap<Utf8, ?>) newMap(old, 0);
+    map.clearStates();
+    if (readDirtyBits) {
+      int size = in.readInt();
+      for (int j = 0; j < size; j++) {
+        Utf8 key = in.readString(null);
+        State state = State.values()[in.readInt()];
+        map.putState(key, state);
+      }
+    }
+    return super.readMap(map, expected, in);
+  }
+
+  @Override
+  @SuppressWarnings({ "rawtypes" })
+  protected Object newMap(Object old, int size) {
+    if (old instanceof StatefulHashMap) {
+      ((Map) old).clear();
+      ((StatefulHashMap)old).clearStates();
+      return old;
+    }
+    return new StatefulHashMap<Object, Object>();
+  }
+
+  /** Called to create new array instances.  Subclasses may override to use a
+   * different array implementation.  By default, this returns a {@link
+   * GenericData.Array}.*/
+  @Override
+  @SuppressWarnings("rawtypes")
+  protected Object newArray(Object old, int size, Schema schema) {
+    if (old instanceof ListGenericArray) {
+      ((GenericArray) old).clear();
+      return old;
+    } else return new ListGenericArray(size, schema);
+  }
+  
+  public Persistent clone(Persistent persistent, Schema schema) {
+    Persistent cloned = persistent.newInstance(new StateManagerImpl());
+    List<Field> fields = schema.getFields();
+    for(Field field: fields) {
+      int pos = field.pos();
+      switch(field.schema().getType()) {
+        case MAP    :
+        case ARRAY  :
+        case RECORD : 
+        case STRING : cloned.put(pos, cloneObject(
+            field.schema(), persistent.get(pos), cloned.get(pos))); break;
+        case NULL   : break;
+        default     : cloned.put(pos, persistent.get(pos)); break;
+      }
+    }
+    
+    return cloned;
+  }
+  
+  @SuppressWarnings("unchecked")
+  protected Object cloneObject(Schema schema, Object toClone, Object cloned) {
+    if(toClone == null) {
+      return null;
+    }
+    
+    switch(schema.getType()) {
+      case MAP    :
+        Map<Utf8, Object> map = (Map<Utf8, Object>)newMap(cloned, 0);
+        for(Map.Entry<Utf8, Object> entry: ((Map<Utf8, Object>)toClone).entrySet()) {
+          map.put((Utf8)createString(entry.getKey().toString())
+              , cloneObject(schema.getValueType(), entry.getValue(), null));
+        }
+        return map;
+      case ARRAY  :
+        GenericArray<Object> array = (GenericArray<Object>) 
+          newArray(cloned, (int)((GenericArray<?>)toClone).size(), schema);
+        for(Object element: (GenericArray<Object>)toClone) {
+          array.add(cloneObject(schema.getElementType(), element, null));
+        }
+        return array;
+      case RECORD : return clone((Persistent)toClone, schema);
+      case STRING : return createString(toClone.toString());
+      default     : return toClone; //shallow copy is enough
+    }
+  }
+}



Mime
View raw message