Return-Path: Delivered-To: apmail-incubator-gora-commits-archive@minotaur.apache.org Received: (qmail 29337 invoked from network); 8 Oct 2010 21:17:55 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 8 Oct 2010 21:17:55 -0000 Received: (qmail 47217 invoked by uid 500); 8 Oct 2010 21:17:55 -0000 Delivered-To: apmail-incubator-gora-commits-archive@incubator.apache.org Received: (qmail 47192 invoked by uid 500); 8 Oct 2010 21:17:55 -0000 Mailing-List: contact gora-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: gora-dev@incubator.apache.org Delivered-To: mailing list gora-commits@incubator.apache.org Received: (qmail 47185 invoked by uid 99); 8 Oct 2010 21:17:55 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Oct 2010 21:17:55 +0000 X-ASF-Spam-Status: No, hits=-1998.4 required=10.0 tests=ALL_TRUSTED,FR_ALMOST_VIAG2,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Oct 2010 21:17:42 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 16DBE2388A02; Fri, 8 Oct 2010 21:17:20 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1006024 [2/8] - in /incubator/gora: branches/ tags/ trunk/ trunk/bin/ trunk/conf/ trunk/docs/ trunk/gora-cassandra/ trunk/gora-cassandra/ivy/ trunk/gora-cassandra/lib-ext/ trunk/gora-cassandra/src/ trunk/gora-cassandra/src/examples/ trunk/... Date: Fri, 08 Oct 2010 21:17:17 -0000 To: gora-commits@incubator.apache.org From: enis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101008211720.16DBE2388A02@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/Select.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/Select.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/Select.java (added) +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/Select.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,78 @@ +package org.gora.cassandra.client; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.cassandra.thrift.ColumnParent; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.SliceRange; +import org.gora.util.ByteUtils; + +public class Select { + + private static final SlicePredicate ALL_PREDICATE; + + static { + ALL_PREDICATE = new SlicePredicate(); + ALL_PREDICATE.setSlice_range(new SliceRange(new byte[0], new byte[0], + false, Integer.MAX_VALUE)); + } + + private Map predicateMap; + + public Select() { + predicateMap = new HashMap(); + } + + private SlicePredicate getOrCreate(ColumnParent columnParent) { + SlicePredicate predicate = predicateMap.get(columnParent); + if (predicate == null) { + predicate = new SlicePredicate(); + predicateMap.put(columnParent, predicate); + } + return predicate; + } + + public Select addColumnName(String superColumnFamily, String superColumn, + String columnName) { + ColumnParent parent = new ColumnParent(superColumnFamily); + parent.setSuper_column(ByteUtils.toBytes(superColumn)); + SlicePredicate predicate = getOrCreate(parent); + if (predicate.getSlice_range() != null) { + // TODO: Make this another exception + throw new RuntimeException("Can't add columns if slice_range is not null"); + } + predicate.addToColumn_names(ByteUtils.toBytes(columnName)); + return this; + } + + public Select addColumnName(String columnFamily, String columnName) { + SlicePredicate predicate = getOrCreate(new ColumnParent(columnFamily)); + if (predicate.getSlice_range() != null) { + // TODO: Make this another exception + throw new RuntimeException("Can't add columns if slice_range is not null"); + } + predicate.addToColumn_names(ByteUtils.toBytes(columnName)); + return this; + } + + public Select addSuperColumnAll(String superColumnFamily) { + return addColumnAll(superColumnFamily); + } + + public Select addAllColumnsForSuperColumn(String superColumnFamily, String superColumnName) { + ColumnParent parent = new ColumnParent(superColumnFamily); + parent.setSuper_column(ByteUtils.toBytes(superColumnName)); + predicateMap.put(parent, ALL_PREDICATE); + return this; + } + + public Select addColumnAll(String columnFamily) { + predicateMap.put(new ColumnParent(columnFamily), ALL_PREDICATE); + return this; + } + + /*package*/ Map getPredicateMap() { + return predicateMap; + } +} Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SimpleCassandraClient.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SimpleCassandraClient.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SimpleCassandraClient.java (added) +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SimpleCassandraClient.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,247 @@ +package org.gora.cassandra.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.ColumnParent; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.KeyRange; +import org.apache.cassandra.thrift.KeySlice; +import org.apache.cassandra.thrift.Mutation; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.TokenRange; +import org.apache.commons.lang.NotImplementedException; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class SimpleCassandraClient implements CassandraClient { + + public Cassandra.Client client; + + private TTransport transport; + + private String keySpace; + + private ConsistencyLevel consistencyLevel; + + private static ExecutorService SERVICE = + Executors.newSingleThreadScheduledExecutor(); + + private static int NUM_CLIENTS = 0; + + static { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + synchronized (SERVICE) { + if (!SERVICE.isShutdown()) { + SERVICE.shutdown(); + } + } + } + }); + } + + public SimpleCassandraClient(String host, int port, String keySpace) + throws TTransportException { + this(host, port, keySpace, ConsistencyLevel.ONE); + } + + public SimpleCassandraClient(String host, int port, + String keySpace, ConsistencyLevel consistencyLevel) + throws TTransportException { + this.transport = new TSocket(host, port); + this.transport.open(); + this.client = new Cassandra.Client(new TBinaryProtocol(transport)); + setKeySpace(keySpace); + setConsistencyLevel(consistencyLevel); + synchronized (SERVICE) { + NUM_CLIENTS++; + if (SERVICE.isShutdown()) { + SERVICE = Executors.newSingleThreadScheduledExecutor(); + } + } + } + + public Cassandra.Client getClient() { + return client; + } + + @Override + public Map> describeKeySpace() + throws IOException { + try { + return client.describe_keyspace(keySpace); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public Row get(String key, Select select) throws IOException { + List sliceGets = + new ArrayList(); + Map predicateMap = select.getPredicateMap(); + + for (Entry e : predicateMap.entrySet()) { + sliceGets.add(new SliceGet(client, keySpace, key, + e.getKey(), e.getValue(), consistencyLevel)); + } + List>>> results; + try { + results = SERVICE.invokeAll(sliceGets); + } catch (InterruptedException e1) { + throw new IOException(e1); + } + Row row = new Row(key); + for (Future>> f : results) { + Pair> pair; + try { + pair = f.get(); + } catch (Exception e1) { + throw new IOException(e1); + } + for (ColumnOrSuperColumn csc : pair.getSecond()) { + ColumnParent parent = pair.getFirst(); + row.addColumnOrSuperColumn(parent.column_family, parent.super_column, csc); + } + } + return row; + } + + public List getRangeIntl(String startKey, String endKey, int rowCount, Select select) + throws IOException { + KeyRange range = new KeyRange(rowCount).setStart_key(startKey).setEnd_key(endKey); + return getKeyRange(range, select); + } + + public List getTokenRangeIntl(String startToken, String endToken, int rowCount, + Select select) throws IOException { + KeyRange range = + new KeyRange(rowCount).setStart_token(startToken).setEnd_token(endToken); + return getKeyRange(range, select); + } + + @Override + public RowIterable getRange(String startKey, String endKey, int rowCount, + Select select) throws IOException { + // TODO: Not yet implemented!!! + throw new NotImplementedException("Not yet implemented!"); + } + + @Override + public RowIterable getTokenRange(String startToken, String endToken, + int rowCount, Select select) throws IOException { + return new TokenRangeRowIterableImpl(this, startToken, endToken, rowCount, select); + } + + private List getKeyRange(KeyRange keyRange, Select select) + throws IOException { + List rangeSliceGets = + new ArrayList(); + Map predicateMap = select.getPredicateMap(); + + for (Entry e : predicateMap.entrySet()) { + rangeSliceGets.add(new RangeSliceGet(client, keySpace, keyRange, + e.getKey(), e.getValue(), consistencyLevel)); + } + List>>> results; + try { + results = SERVICE.invokeAll(rangeSliceGets); + } catch (InterruptedException e1) { + throw new IOException(e1); + } + + Map rowMap = new HashMap(); + + for (Future>> keySlicesTask : results) { + Pair> keySlices; + try { + keySlices = keySlicesTask.get(); + } catch (Exception e) { + throw new IOException(e); + } + for (KeySlice keySlice : keySlices.getSecond()) { + addKeySliceToRowMap(rowMap, keySlices.getFirst(), keySlice); + } + } + + return new ArrayList(rowMap.values()); + } + + private void addKeySliceToRowMap(Map rowMap, ColumnParent parent, + KeySlice keySlice) { + Row row = rowMap.get(keySlice.key); + if (row == null) { + row = new Row(keySlice.key); + rowMap.put(row.getKey(), row); + } + for (ColumnOrSuperColumn csc : keySlice.columns) { + row.addColumnOrSuperColumn(parent.column_family, parent.super_column, csc); + } + } + + @Override + public void mutate(String key, Mutate mutation) throws IOException { + Map>> rowMutations = + new HashMap>>(); + rowMutations.put(key, mutation.getMutationMap()); + try { + client.batch_mutate(keySpace, rowMutations, consistencyLevel); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public void setKeySpace(String keySpace) { + this.keySpace = keySpace; + } + + @Override + public void setConsistencyLevel(ConsistencyLevel level) { + this.consistencyLevel = level; + } + + @Override + public void close() { + transport.close(); + synchronized (SERVICE) { + NUM_CLIENTS--; + if (NUM_CLIENTS == 0 && !SERVICE.isShutdown()) { + SERVICE.shutdown(); + } + } + } + + @Override + public List describeRing() throws IOException { + try { + return client.describe_ring(keySpace); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public List describeSplits(String startToken, String endToken, + int size) throws IOException { + try { + return client.describe_splits(startToken, endToken, size); + } catch (TException e) { + throw new IOException(e); + } + } +} Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SliceGet.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SliceGet.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SliceGet.java (added) +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/SliceGet.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,39 @@ +package org.gora.cassandra.client; + +import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.ColumnParent; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.SlicePredicate; + +public class SliceGet implements Callable>> { + + private Cassandra.Client client; + private String keySpace; + private String key; + private ColumnParent parent; + private SlicePredicate predicate; + private ConsistencyLevel consistencyLevel; + + public SliceGet(Cassandra.Client client, String keySpace, String key, + ColumnParent parent, SlicePredicate predicate, + ConsistencyLevel consistencyLevel) { + this.client = client; + this.keySpace = keySpace; + this.key = key; + this.parent = parent; + this.predicate = predicate; + this.consistencyLevel = consistencyLevel; + } + + @Override + public Pair> call() + throws Exception { + return new Pair>(parent, + client.get_slice(keySpace, key, parent, predicate, consistencyLevel)); + } + +} Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/TokenRangeRowIterableImpl.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/TokenRangeRowIterableImpl.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/TokenRangeRowIterableImpl.java (added) +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/client/TokenRangeRowIterableImpl.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,92 @@ +package org.gora.cassandra.client; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; + +class TokenRangeRowIterableImpl implements RowIterable { + + private static final Log LOG = LogFactory.getLog(RowIterable.class); + + private final SimpleCassandraClient client; + + private String startToken; + + private String endToken; + + private final int batchCount; + + private Select select; + + private List rows; + + private int rowIndex; + + TokenRangeRowIterableImpl(SimpleCassandraClient client, + String startToken, String endToken, int batchCount, + Select select) { + this.client = client; + this.startToken = startToken; + this.endToken = endToken; + this.batchCount = batchCount; + this.select = select; + this.rows = null; + this.rowIndex = 0; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private void maybeInit() throws IOException { + if (rows != null && rowIndex >= rows.size()) { + rows = null; + } + + if (rows != null) { + return; + } + + rows = client.getTokenRangeIntl(startToken, endToken, batchCount, select); + + if (rows.isEmpty()) { + rows = null; + return; + } + + rowIndex = 0; + Row lastRow = rows.get(rows.size() - 1); + IPartitioner p = DatabaseDescriptor.getPartitioner(); + startToken = p.getTokenFactory().toString(p.getToken(lastRow.getKey())); + } + + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + try { + maybeInit(); + } catch (IOException e) { + LOG.warn(StringUtils.stringifyException(e)); + return false; + } + return rows != null; + } + + @Override + public Row next() { + return rows.get(rowIndex++); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + +} Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraPartitionQuery.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraPartitionQuery.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraPartitionQuery.java (added) +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraPartitionQuery.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,76 @@ +package org.gora.cassandra.query; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.gora.persistency.Persistent; +import org.gora.query.Query; +import org.gora.query.impl.PartitionQueryImpl; + +public class CassandraPartitionQuery +extends PartitionQueryImpl { + + private String startToken; + + private String endToken; + + private String[] endPoints; + + private int splitSize; + + public CassandraPartitionQuery() { + this.dataStore = null; + } + + public CassandraPartitionQuery(Query baseQuery, String startToken, String endToken, String[] endPoints, + int splitSize) { + super(baseQuery); + this.startToken = startToken; + this.endToken = endToken; + this.endPoints = endPoints; + this.splitSize = splitSize; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Text.writeString(out, startToken); + Text.writeString(out, endToken); + out.writeInt(endPoints.length); + for (String endPoint : endPoints) { + Text.writeString(out, endPoint); + } + out.writeInt(splitSize); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + startToken = Text.readString(in); + endToken = Text.readString(in); + int size = in.readInt(); + endPoints = new String[size]; + for (int i = 0; i < size; i++) { + endPoints[i] = Text.readString(in); + } + splitSize = in.readInt(); + } + + public String getStartToken() { + return startToken; + } + + public String getEndToken() { + return endToken; + } + + public String[] getEndPoints() { + return endPoints; + } + + public int getSplitSize() { + return splitSize; + } +} Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraQuery.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraQuery.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraQuery.java (added) +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraQuery.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,17 @@ +package org.gora.cassandra.query; + +import org.gora.persistency.Persistent; +import org.gora.query.impl.QueryBase; +import org.gora.store.DataStore; + +public class CassandraQuery +extends QueryBase { + + public CassandraQuery() { + super(null); + } + + public CassandraQuery(DataStore dataStore) { + super(dataStore); + } +} Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraResult.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraResult.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraResult.java (added) +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/query/CassandraResult.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,140 @@ +package org.gora.cassandra.query; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Iterator; + +import org.gora.cassandra.client.CassandraClient; +import org.gora.cassandra.client.Row; +import org.gora.cassandra.client.Select; +import org.gora.cassandra.store.CassandraStore; +import org.gora.persistency.Persistent; +import org.gora.query.Query; +import org.gora.query.impl.ResultBase; +import org.gora.store.DataStore; + +public class CassandraResult +extends ResultBase { + + private Iterator rowIter; + + private CassandraStore store; + + private String[] fields; + + public CassandraResult(DataStore dataStore, Query query, + int batchRowCount) throws IOException { + super(dataStore, query); + + store = (CassandraStore) dataStore; + fields = query.getFields(); + + boolean isUsingTokens = (query instanceof CassandraPartitionQuery); + String startTokenOrKey; + String endTokenOrKey; + + if (isUsingTokens) { + CassandraPartitionQuery partitionQuery = (CassandraPartitionQuery) query; + startTokenOrKey = partitionQuery.getStartToken(); + endTokenOrKey = partitionQuery.getEndToken(); + } else { + CassandraQuery cassandraQuery = (CassandraQuery) query; + startTokenOrKey = cassandraQuery.getStartKey().toString(); + endTokenOrKey = cassandraQuery.getEndKey().toString(); + } + + Select select = store.createSelect(fields); + + CassandraClient client = store.getClientByLocation(getLocation(query)); + if (isUsingTokens) { + rowIter = + client.getTokenRange(startTokenOrKey, endTokenOrKey, + batchRowCount, select).iterator(); + } else { + rowIter = client.getRange(startTokenOrKey, endTokenOrKey, + batchRowCount, select).iterator(); + } + } + + @Override + public float getProgress() throws IOException { + return 0; + } + + @Override + protected boolean nextInner() throws IOException { + if (!rowIter.hasNext()) { + return false; + } + Row row = rowIter.next(); + if (row == null) { + return false; + } + + key = toKey(row.getKey()); + persistent = store.newInstance(row, fields); + + return true; + } + + @SuppressWarnings("unchecked") + private K toKey(String keyStr) { + Class keyClass = dataStore.getKeyClass(); + if (keyClass.isAssignableFrom(String.class)) { + return (K) keyStr; + } + if (keyClass.isAssignableFrom(Integer.class)) { + return (K) (Integer) Integer.parseInt(keyStr); + } + if (keyClass.isAssignableFrom(Float.class)) { + return (K) (Float) Float.parseFloat(keyStr); + } + if (keyClass.isAssignableFrom(Double.class)) { + return (K) (Double) Double.parseDouble(keyStr); + } + if (keyClass.isAssignableFrom(Long.class)) { + return (K) (Long) Long.parseLong(keyStr); + } + if (keyClass.isAssignableFrom(Short.class)) { + return (K) (Short) Short.parseShort(keyStr); + } + if (keyClass.isAssignableFrom(Byte.class)) { + return (K) (Byte) Byte.parseByte(keyStr); + } + + throw new RuntimeException("Can't parse " + keyStr + + " as an instance of " + keyClass); + } + + @Override + public void close() throws IOException { } + + private String getLocation(Query query) { + if (!(query instanceof CassandraPartitionQuery)) { + return null; + } + CassandraPartitionQuery partitonQuery = + (CassandraPartitionQuery) query; + InetAddress[] localAddresses = new InetAddress[0]; + try { + localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress()); + } catch (UnknownHostException e) { + throw new AssertionError(e); + } + for (InetAddress address : localAddresses) { + for (String location : partitonQuery.getEndPoints()) { + InetAddress locationAddress = null; + try { + locationAddress = InetAddress.getByName(location); + } catch (UnknownHostException e) { + throw new AssertionError(e); + } + if (address.equals(locationAddress)) { + return location; + } + } + } + return partitonQuery.getEndPoints()[0]; + } +} \ No newline at end of file Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraColumn.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraColumn.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraColumn.java (added) +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraColumn.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,55 @@ +package org.gora.cassandra.store; + +class CassandraColumn { + String family; + String superColumn; + String column; + + public CassandraColumn(String family, String superColumn, String column) { + this.family = family; + this.superColumn = superColumn; + this.column = column; + } + + public boolean isSuperColumn() { + return superColumn != null; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((column == null) ? 0 : column.hashCode()); + result = prime * result + ((family == null) ? 0 : family.hashCode()); + result = prime * result + + ((superColumn == null) ? 0 : superColumn.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + CassandraColumn other = (CassandraColumn) obj; + if (column == null) { + if (other.column != null) + return false; + } else if (!column.equals(other.column)) + return false; + if (family == null) { + if (other.family != null) + return false; + } else if (!family.equals(other.family)) + return false; + if (superColumn == null) { + if (other.superColumn != null) + return false; + } else if (!superColumn.equals(other.superColumn)) + return false; + return true; + } +} Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraMapping.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraMapping.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraMapping.java (added) +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraMapping.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,33 @@ +package org.gora.cassandra.store; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class CassandraMapping { + + private String keySpace; + + private Map families = + new HashMap(); + + public String getKeySpace() { + return keySpace; + } + + public void setKeySpace(String keySpace) { + this.keySpace = keySpace; + } + + public Set getColumnFamilies() { + return families.keySet(); + } + + public void addColumnFamily(String columnFamily, boolean isSuper) { + families.put(columnFamily, isSuper); + } + + public boolean isColumnFamilySuper(String columnFamily) { + return families.get(columnFamily); + } +} Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraStore.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraStore.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraStore.java (added) +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/gora/cassandra/store/CassandraStore.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,448 @@ +package org.gora.cassandra.store; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericArray; +import org.apache.avro.util.Utf8; +import org.apache.cassandra.thrift.TokenRange; +import org.gora.cassandra.client.CassandraClient; +import org.gora.cassandra.client.Mutate; +import org.gora.cassandra.client.Row; +import org.gora.cassandra.client.Select; +import org.gora.cassandra.client.SimpleCassandraClient; +import org.gora.cassandra.query.CassandraPartitionQuery; +import org.gora.cassandra.query.CassandraQuery; +import org.gora.cassandra.query.CassandraResult; +import org.gora.persistency.ListGenericArray; +import org.gora.persistency.Persistent; +import org.gora.persistency.State; +import org.gora.persistency.StateManager; +import org.gora.persistency.StatefulHashMap; +import org.gora.persistency.StatefulMap; +import org.gora.query.PartitionQuery; +import org.gora.query.Query; +import org.gora.query.Result; +import org.gora.store.DataStoreFactory; +import org.gora.store.impl.DataStoreBase; +import org.gora.util.ByteUtils; +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.input.SAXBuilder; + +/** + * DataStore for Cassandra. + * + *

Note: CassandraStore is not thread-safe.

+ */ +public class CassandraStore +extends DataStoreBase { + + private static final String ERROR_MESSAGE = + "Cassandra does not support creating or modifying ColumnFamilies during runtime"; + + private static final String DEFAULT_MAPPING_FILE = "gora-cassandra-mapping.xml"; + + private static final int SPLIT_SIZE = 65536; + + private static final int BATCH_COUNT = 256; + + private CassandraClient client; + + private Map columnMap; + + private CassandraMapping mapping; + + @Override + public void initialize(Class keyClass, Class persistentClass, + Properties properties) throws IOException { + super.initialize(keyClass, persistentClass, properties); + + String mappingFile = + DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE); + + readMapping(mappingFile); + } + + @Override + public String getSchemaName() { + return mapping.getKeySpace(); + } + + @Override + public void createSchema() throws IOException { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public void deleteSchema() throws IOException { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public boolean schemaExists() throws IOException { + return true; + } + + public CassandraClient getClientByLocation(String endPoint) { + return client; + } + + public Select createSelect(String[] fields) { + Select select = new Select(); + if (fields == null) { + fields = beanFactory.getCachedPersistent().getFields(); + } + for (String f : fields) { + CassandraColumn col = columnMap.get(f); + Schema fieldSchema = fieldMap.get(f).schema(); + switch (fieldSchema.getType()) { + case MAP: + case ARRAY: + if (col.isSuperColumn()) { + select.addAllColumnsForSuperColumn(col.family, col.superColumn); + } else { + select.addColumnAll(col.family); + } + break; + default: + if (col.isSuperColumn()) { + select.addColumnName(col.family, col.superColumn, col.column); + } else { + select.addColumnName(col.family, col.column); + } + break; + } + } + return select; + } + + @Override + public T get(K key, String[] fields) throws IOException { + if (fields == null) { + fields = beanFactory.getCachedPersistent().getFields(); + } + Select select = createSelect(fields); + try { + Row result = client.get(key.toString(), select); + return newInstance(result, fields); + } catch (Exception e) { + throw new IOException(e); + } + } + + @SuppressWarnings("rawtypes") + private void setField(T persistent, Field field, StatefulMap map) { + persistent.put(field.pos(), map); + } + + private void setField(T persistent, Field field, byte[] val) + throws IOException { + persistent.put(field.pos() + , ByteUtils.fromBytes(val, field.schema(), datumReader, persistent.get(field.pos()))); + } + + @SuppressWarnings("rawtypes") + private void setField(T persistent, Field field, GenericArray list) { + persistent.put(field.pos(), list); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public T newInstance(Row result, String[] fields) + throws IOException { + if(result == null) + return null; + + T persistent = newPersistent(); + StateManager stateManager = persistent.getStateManager(); + for (String f : fields) { + CassandraColumn col = columnMap.get(f); + Field field = fieldMap.get(f); + Schema fieldSchema = field.schema(); + Map qualMap; + switch(fieldSchema.getType()) { + case MAP: + if (col.isSuperColumn()) { + qualMap = result.getSuperColumn(col.family, col.superColumn); + } else { + qualMap = result.getColumn(col.family); + } + if (qualMap == null) { + continue; + } + Schema valueSchema = fieldSchema.getValueType(); + StatefulMap map = new StatefulHashMap(); + for (Entry e : qualMap.entrySet()) { + Utf8 mapKey = new Utf8(e.getKey()); + map.put(mapKey, ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null)); + map.putState(mapKey, State.CLEAN); + } + setField(persistent, field, map); + break; + case ARRAY: + if (col.isSuperColumn()) { + qualMap = result.getSuperColumn(col.family, col.superColumn); + } else { + qualMap = result.getColumn(col.family); + } + if (qualMap == null) { + continue; + } + valueSchema = fieldSchema.getElementType(); + ArrayList arrayList = new ArrayList(); + for (Entry e : qualMap.entrySet()) { + arrayList.add(ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null)); + } + ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList); + setField(persistent, field, arr); + break; + default: + byte[] val; + if (col.isSuperColumn()) { + val = result.get(col.family, col.superColumn, col.column); + } else { + val = result.get(col.family, col.column); + } + if (val == null) { + continue; + } + setField(persistent, field, val); + break; + } + } + stateManager.clearDirty(persistent); + return persistent; + } + + @Override + public void put(K key, T obj) throws IOException { + Mutate mutate = new Mutate(); + Schema schema = obj.getSchema(); + StateManager stateManager = obj.getStateManager(); + List fields = schema.getFields(); + String qual; + byte[] value; + for (int i = 0; i < fields.size(); i++) { + if (!stateManager.isDirty(obj, i)) { + continue; + } + Field field = fields.get(i); + Type type = field.schema().getType(); + Object o = obj.get(i); + CassandraColumn col = columnMap.get(field.name()); + + switch(type) { + case MAP: + if(o instanceof StatefulMap) { + @SuppressWarnings("unchecked") + StatefulMap map = (StatefulMap) o; + for (Entry e : map.states().entrySet()) { + Utf8 mapKey = e.getKey(); + switch (e.getValue()) { + case DIRTY: + qual = mapKey.toString(); + value = ByteUtils.toBytes(map.get(mapKey), field.schema().getValueType(), datumWriter); + if (col.isSuperColumn()) { + mutate.put(col.family, col.superColumn, qual, value); + } else { + mutate.put(col.family, qual, value); + } + break; + case DELETED: + qual = mapKey.toString(); + if (col.isSuperColumn()) { + mutate.delete(col.family, col.superColumn, qual); + } else { + mutate.delete(col.family, qual); + } + break; + } + } + } else { + @SuppressWarnings({ "rawtypes", "unchecked" }) + Set set = ((Map)o).entrySet(); + for(@SuppressWarnings("rawtypes") Entry entry: set) { + qual = entry.getKey().toString(); + value = ByteUtils.toBytes(entry.getValue().toString()); + if (col.isSuperColumn()) { + mutate.put(col.family, col.superColumn, qual, value); + } else { + mutate.put(col.family, qual, value); + } + } + } + break; + case ARRAY: + if(o instanceof GenericArray) { + @SuppressWarnings("rawtypes") + GenericArray arr = (GenericArray) o; + int j=0; + for(Object item : arr) { + value = ByteUtils.toBytes(item.toString()); + if (col.isSuperColumn()) { + mutate.put(col.family, col.superColumn, Integer.toString(j), value); + } else { + mutate.put(col.family, Integer.toString(j), value); + } + j++; + } + } + break; + default: + value = ByteUtils.toBytes(o, field.schema(), datumWriter); + if (col.isSuperColumn()) { + mutate.put(col.family, col.superColumn, col.column, value); + } else { + mutate.put(col.family, col.column, value); + } + break; + } + } + + if(!mutate.isEmpty()) + client.mutate(key.toString(), mutate); + } + + @Override + public boolean delete(K key) throws IOException { + Mutate mutate = new Mutate(); + for (String family : mapping.getColumnFamilies()) { + mutate.deleteAll(family); + } + + client.mutate(key.toString(), mutate); + return true; + } + + @Override + public void flush() throws IOException { } + + @Override + public void close() throws IOException { + client.close(); + } + + @Override + public Query newQuery() { + return new CassandraQuery(this); + } + + @Override + public long deleteByQuery(Query query) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Result execute(Query query) throws IOException { + return new CassandraResult(this, query, BATCH_COUNT); + } + + @Override + public List> getPartitions(Query query) + throws IOException { + List> partitions = new ArrayList>(); + + List rangeList = client.describeRing(); + for (TokenRange range : rangeList) { + List tokens = + client.describeSplits(range.start_token, range.end_token, SPLIT_SIZE); + // turn the sub-ranges into InputSplits + String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]); + // hadoop needs hostname, not ip + for (int i = 0; i < endpoints.length; i++) { + endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName(); + } + + for (int i = 1; i < tokens.size(); i++) { + CassandraPartitionQuery partitionQuery = + new CassandraPartitionQuery(query, tokens.get(i - 1), tokens.get(i), endpoints, SPLIT_SIZE); + partitions.add(partitionQuery); + } + } + return partitions; + } + + private CassandraClient createClient() throws IOException { + String serverStr = + DataStoreFactory.findPropertyOrDie(properties, this, "servers"); + String[] server1Parts = serverStr.split(",")[0].split(":"); + try { + return new SimpleCassandraClient(server1Parts[0], + Integer.parseInt(server1Parts[1]), mapping.getKeySpace()); + } catch (Exception e) { + throw new IOException(e); + } + } + + @SuppressWarnings("unchecked") + protected void readMapping(String filename) throws IOException { + + mapping = new CassandraMapping(); + columnMap = new HashMap(); + + try { + SAXBuilder builder = new SAXBuilder(); + Document doc = builder.build(getClass().getClassLoader() + .getResourceAsStream(filename)); + + List classes = doc.getRootElement().getChildren("class"); + + for(Element classElement: classes) { + if(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName()) + && classElement.getAttributeValue("name").equals( + persistentClass.getCanonicalName())) { + + String keySpace = classElement.getAttributeValue("keyspace"); + mapping.setKeySpace(keySpace); + client = createClient(); + Map> keySpaceDesc = client.describeKeySpace(); + for (Entry> e : keySpaceDesc.entrySet()) { + boolean isSuper = e.getValue().get("Type").equals("Super"); + mapping.addColumnFamily(e.getKey(), isSuper); + } + + List fields = classElement.getChildren("field"); + + for(Element field:fields) { + String fieldName = field.getAttributeValue("name"); + String path = field.getAttributeValue("path"); + String[] parts = path.split(":"); + String columnFamily = parts[0]; + String superColumn = null; + String column = null; + + boolean isSuper = mapping.isColumnFamilySuper(columnFamily); + if (isSuper) { + superColumn = parts[1]; + if (parts.length == 3) { + column = parts[2]; + } + } else { + if (parts.length == 2) { + column = parts[1]; + } + } + + columnMap.put(fieldName, + new CassandraColumn(columnFamily, superColumn, column)); + } + + break; + } + } + } catch(Exception ex) { + throw new IOException(ex); + } + } +} \ No newline at end of file Added: incubator/gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml (added) +++ incubator/gora/trunk/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml Fri Oct 8 21:17:10 2010 @@ -0,0 +1,24 @@ + + + + + + + + + + + + + + + + + + + + + + + + Added: incubator/gora/trunk/gora-cassandra/src/test/conf/gora.properties URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/test/conf/gora.properties?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-cassandra/src/test/conf/gora.properties (added) +++ incubator/gora/trunk/gora-cassandra/src/test/conf/gora.properties Fri Oct 8 21:17:10 2010 @@ -0,0 +1 @@ +gora.cassandrastore.servers=localhost:9160 Added: incubator/gora/trunk/gora-cassandra/src/test/java/.gitignore URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/test/java/.gitignore?rev=1006024&view=auto ============================================================================== (empty) Added: incubator/gora/trunk/gora-core/build.xml URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/build.xml?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-core/build.xml (added) +++ incubator/gora/trunk/gora-core/build.xml Fri Oct 8 21:17:10 2010 @@ -0,0 +1,7 @@ + + + + + + + Added: incubator/gora/trunk/gora-core/conf/.gitignore URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/conf/.gitignore?rev=1006024&view=auto ============================================================================== (empty) Added: incubator/gora/trunk/gora-core/ivy/ivy.xml URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/ivy/ivy.xml?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-core/ivy/ivy.xml (added) +++ incubator/gora/trunk/gora-core/ivy/ivy.xml Fri Oct 8 21:17:10 2010 @@ -0,0 +1,41 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Added: incubator/gora/trunk/gora-core/lib-ext/.gitignore URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/lib-ext/.gitignore?rev=1006024&view=auto ============================================================================== (empty) Added: incubator/gora/trunk/gora-core/src/examples/avro/employee.json URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/avro/employee.json?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-core/src/examples/avro/employee.json (added) +++ incubator/gora/trunk/gora-core/src/examples/avro/employee.json Fri Oct 8 21:17:10 2010 @@ -0,0 +1,11 @@ + { + "type": "record", + "name": "Employee", + "namespace": "org.gora.examples.generated", + "fields" : [ + {"name": "name", "type": "string"}, + {"name": "dateOfBirth", "type": "long"}, + {"name": "ssn", "type": "string"}, + {"name": "salary", "type": "int"} + ] + } Added: incubator/gora/trunk/gora-core/src/examples/avro/tokendatum.json URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/avro/tokendatum.json?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-core/src/examples/avro/tokendatum.json (added) +++ incubator/gora/trunk/gora-core/src/examples/avro/tokendatum.json Fri Oct 8 21:17:10 2010 @@ -0,0 +1,8 @@ +{ + "type": "record", + "name": "TokenDatum", + "namespace": "org.gora.examples.generated", + "fields" : [ + {"name": "count", "type": "int"} + ] +} Added: incubator/gora/trunk/gora-core/src/examples/avro/webpage.json URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/avro/webpage.json?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-core/src/examples/avro/webpage.json (added) +++ incubator/gora/trunk/gora-core/src/examples/avro/webpage.json Fri Oct 8 21:17:10 2010 @@ -0,0 +1,20 @@ +{ + "type": "record", + "name": "WebPage", + "namespace": "org.gora.examples.generated", + "fields" : [ + {"name": "url", "type": "string"}, + {"name": "content", "type": "bytes"}, + {"name": "parsedContent", "type": {"type":"array", "items": "string"}}, + {"name": "outlinks", "type": {"type":"map", "values":"string"}}, + {"name": "metadata", "type": { + "name": "Metadata", + "type": "record", + "namespace": "org.gora.examples.generated", + "fields": [ + {"name": "version", "type": "int"}, + {"name": "data", "type": {"type": "map", "values": "string"}} + ] + }} + ] +} Added: incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/WebPageDataCreator.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/WebPageDataCreator.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/WebPageDataCreator.java (added) +++ incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/WebPageDataCreator.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,136 @@ + +package org.gora.examples; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; + +import org.apache.avro.util.Utf8; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.gora.examples.generated.Metadata; +import org.gora.examples.generated.WebPage; +import org.gora.store.DataStore; +import org.gora.store.DataStoreFactory; + +/** + * Creates and stores some data to be used in the tests. + */ +public class WebPageDataCreator { + + private static final Log log = LogFactory.getLog(WebPageDataCreator.class); + + public static final String[] URLS = { + "http://foo.com/", + "http://foo.com/1.html", + "http://foo.com/2.html", + "http://bar.com/3.jsp", + "http://bar.com/1.html", + "http://bar.com/", + "http://baz.com/1.jsp&q=barbaz", + "http://baz.com/1.jsp&q=barbaz&p=foo", + "http://baz.com/1.jsp&q=foo", + "http://bazbar.com", + }; + + public static HashMap URL_INDEXES = new HashMap(); + + static { + for(int i=0; i dataStore) + throws IOException { + WebPage page; + log.info("creating web page data"); + + for(int i=0; i 0) { + dataStoreClass = args[0]; + } + + DataStore store + = DataStoreFactory.getDataStore(dataStoreClass, String.class, WebPage.class); + createWebPageData(store); + + return 0; + } + + public static void main(String[] args) throws Exception { + new WebPageDataCreator().run(args); + } +} Added: incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Employee.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Employee.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Employee.java (added) +++ incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Employee.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,102 @@ +package org.gora.examples.generated; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import org.apache.avro.Protocol; +import org.apache.avro.Schema; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Protocol; +import org.apache.avro.util.Utf8; +import org.apache.avro.ipc.AvroRemoteException; +import org.apache.avro.generic.GenericArray; +import org.apache.avro.specific.SpecificExceptionBase; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificFixed; +import org.gora.persistency.StateManager; +import org.gora.persistency.impl.PersistentBase; +import org.gora.persistency.impl.StateManagerImpl; +import org.gora.persistency.StatefulHashMap; +import org.gora.persistency.ListGenericArray; + +@SuppressWarnings("all") +public class Employee extends PersistentBase { + public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"org.gora.examples.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"dateOfBirth\",\"type\":\"long\"},{\"name\":\"ssn\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"int\"}]}"); + public static enum Field { + NAME(0,"name"), + DATE_OF_BIRTH(1,"dateOfBirth"), + SSN(2,"ssn"), + SALARY(3,"salary"), + ; + private int index; + private String name; + Field(int index, String name) {this.index=index;this.name=name;} + public int getIndex() {return index;} + public String getName() {return name;} + public String toString() {return name;} + }; + public static final String[] _ALL_FIELDS = {"name","dateOfBirth","ssn","salary",}; + static { + PersistentBase.registerFields(Employee.class, _ALL_FIELDS); + } + private Utf8 name; + private long dateOfBirth; + private Utf8 ssn; + private int salary; + public Employee() { + this(new StateManagerImpl()); + } + public Employee(StateManager stateManager) { + super(stateManager); + } + public Employee newInstance(StateManager stateManager) { + return new Employee(stateManager); + } + public Schema getSchema() { return _SCHEMA; } + public Object get(int _field) { + switch (_field) { + case 0: return name; + case 1: return dateOfBirth; + case 2: return ssn; + case 3: return salary; + default: throw new AvroRuntimeException("Bad index"); + } + } + @SuppressWarnings(value="unchecked") + public void put(int _field, Object _value) { + if(isFieldEqual(_field, _value)) return; + getStateManager().setDirty(this, _field); + switch (_field) { + case 0:name = (Utf8)_value; break; + case 1:dateOfBirth = (Long)_value; break; + case 2:ssn = (Utf8)_value; break; + case 3:salary = (Integer)_value; break; + default: throw new AvroRuntimeException("Bad index"); + } + } + public Utf8 getName() { + return (Utf8) get(0); + } + public void setName(Utf8 value) { + put(0, value); + } + public long getDateOfBirth() { + return (Long) get(1); + } + public void setDateOfBirth(long value) { + put(1, value); + } + public Utf8 getSsn() { + return (Utf8) get(2); + } + public void setSsn(Utf8 value) { + put(2, value); + } + public int getSalary() { + return (Integer) get(3); + } + public void setSalary(int value) { + put(3, value); + } +} Added: incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Metadata.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Metadata.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Metadata.java (added) +++ incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/Metadata.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,93 @@ +package org.gora.examples.generated; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import org.apache.avro.Protocol; +import org.apache.avro.Schema; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Protocol; +import org.apache.avro.util.Utf8; +import org.apache.avro.ipc.AvroRemoteException; +import org.apache.avro.generic.GenericArray; +import org.apache.avro.specific.SpecificExceptionBase; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificFixed; +import org.gora.persistency.StateManager; +import org.gora.persistency.impl.PersistentBase; +import org.gora.persistency.impl.StateManagerImpl; +import org.gora.persistency.StatefulHashMap; +import org.gora.persistency.ListGenericArray; + +@SuppressWarnings("all") +public class Metadata extends PersistentBase { + public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"Metadata\",\"namespace\":\"org.gora.examples.generated\",\"fields\":[{\"name\":\"version\",\"type\":\"int\"},{\"name\":\"data\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}"); + public static enum Field { + VERSION(0,"version"), + DATA(1,"data"), + ; + private int index; + private String name; + Field(int index, String name) {this.index=index;this.name=name;} + public int getIndex() {return index;} + public String getName() {return name;} + public String toString() {return name;} + }; + public static final String[] _ALL_FIELDS = {"version","data",}; + static { + PersistentBase.registerFields(Metadata.class, _ALL_FIELDS); + } + private int version; + private Map data; + public Metadata() { + this(new StateManagerImpl()); + } + public Metadata(StateManager stateManager) { + super(stateManager); + data = new StatefulHashMap(); + } + public Metadata newInstance(StateManager stateManager) { + return new Metadata(stateManager); + } + public Schema getSchema() { return _SCHEMA; } + public Object get(int _field) { + switch (_field) { + case 0: return version; + case 1: return data; + default: throw new AvroRuntimeException("Bad index"); + } + } + @SuppressWarnings(value="unchecked") + public void put(int _field, Object _value) { + if(isFieldEqual(_field, _value)) return; + getStateManager().setDirty(this, _field); + switch (_field) { + case 0:version = (Integer)_value; break; + case 1:data = (Map)_value; break; + default: throw new AvroRuntimeException("Bad index"); + } + } + public int getVersion() { + return (Integer) get(0); + } + public void setVersion(int value) { + put(0, value); + } + public Map getData() { + return (Map) get(1); + } + public Utf8 getFromData(Utf8 key) { + if (data == null) { return null; } + return data.get(key); + } + public void putToData(Utf8 key, Utf8 value) { + getStateManager().setDirty(this, 1); + data.put(key, value); + } + public Utf8 removeFromData(Utf8 key) { + if (data == null) { return null; } + getStateManager().setDirty(this, 1); + return data.remove(key); + } +} Added: incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/TokenDatum.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/TokenDatum.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/TokenDatum.java (added) +++ incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/TokenDatum.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,72 @@ +package org.gora.examples.generated; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import org.apache.avro.Protocol; +import org.apache.avro.Schema; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Protocol; +import org.apache.avro.util.Utf8; +import org.apache.avro.ipc.AvroRemoteException; +import org.apache.avro.generic.GenericArray; +import org.apache.avro.specific.SpecificExceptionBase; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificFixed; +import org.gora.persistency.StateManager; +import org.gora.persistency.impl.PersistentBase; +import org.gora.persistency.impl.StateManagerImpl; +import org.gora.persistency.StatefulHashMap; +import org.gora.persistency.ListGenericArray; + +@SuppressWarnings("all") +public class TokenDatum extends PersistentBase { + public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"TokenDatum\",\"namespace\":\"org.gora.examples.generated\",\"fields\":[{\"name\":\"count\",\"type\":\"int\"}]}"); + public static enum Field { + COUNT(0,"count"), + ; + private int index; + private String name; + Field(int index, String name) {this.index=index;this.name=name;} + public int getIndex() {return index;} + public String getName() {return name;} + public String toString() {return name;} + }; + public static final String[] _ALL_FIELDS = {"count",}; + static { + PersistentBase.registerFields(TokenDatum.class, _ALL_FIELDS); + } + private int count; + public TokenDatum() { + this(new StateManagerImpl()); + } + public TokenDatum(StateManager stateManager) { + super(stateManager); + } + public TokenDatum newInstance(StateManager stateManager) { + return new TokenDatum(stateManager); + } + public Schema getSchema() { return _SCHEMA; } + public Object get(int _field) { + switch (_field) { + case 0: return count; + default: throw new AvroRuntimeException("Bad index"); + } + } + @SuppressWarnings(value="unchecked") + public void put(int _field, Object _value) { + if(isFieldEqual(_field, _value)) return; + getStateManager().setDirty(this, _field); + switch (_field) { + case 0:count = (Integer)_value; break; + default: throw new AvroRuntimeException("Bad index"); + } + } + public int getCount() { + return (Integer) get(0); + } + public void setCount(int value) { + put(0, value); + } +} Added: incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/WebPage.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/WebPage.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/WebPage.java (added) +++ incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/generated/WebPage.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,125 @@ +package org.gora.examples.generated; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import org.apache.avro.Protocol; +import org.apache.avro.Schema; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Protocol; +import org.apache.avro.util.Utf8; +import org.apache.avro.ipc.AvroRemoteException; +import org.apache.avro.generic.GenericArray; +import org.apache.avro.specific.SpecificExceptionBase; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificFixed; +import org.gora.persistency.StateManager; +import org.gora.persistency.impl.PersistentBase; +import org.gora.persistency.impl.StateManagerImpl; +import org.gora.persistency.StatefulHashMap; +import org.gora.persistency.ListGenericArray; + +@SuppressWarnings("all") +public class WebPage extends PersistentBase { + public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"WebPage\",\"namespace\":\"org.gora.examples.generated\",\"fields\":[{\"name\":\"url\",\"type\":\"string\"},{\"name\":\"content\",\"type\":\"bytes\"},{\"name\":\"parsedContent\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"outlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"metadata\",\"type\":{\"type\":\"record\",\"name\":\"Metadata\",\"fields\":[{\"name\":\"version\",\"type\":\"int\"},{\"name\":\"data\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}]}}]}"); + public static enum Field { + URL(0,"url"), + CONTENT(1,"content"), + PARSED_CONTENT(2,"parsedContent"), + OUTLINKS(3,"outlinks"), + METADATA(4,"metadata"), + ; + private int index; + private String name; + Field(int index, String name) {this.index=index;this.name=name;} + public int getIndex() {return index;} + public String getName() {return name;} + public String toString() {return name;} + }; + public static final String[] _ALL_FIELDS = {"url","content","parsedContent","outlinks","metadata",}; + static { + PersistentBase.registerFields(WebPage.class, _ALL_FIELDS); + } + private Utf8 url; + private ByteBuffer content; + private GenericArray parsedContent; + private Map outlinks; + private Metadata metadata; + public WebPage() { + this(new StateManagerImpl()); + } + public WebPage(StateManager stateManager) { + super(stateManager); + parsedContent = new ListGenericArray(getSchema().getField("parsedContent").schema()); + outlinks = new StatefulHashMap(); + } + public WebPage newInstance(StateManager stateManager) { + return new WebPage(stateManager); + } + public Schema getSchema() { return _SCHEMA; } + public Object get(int _field) { + switch (_field) { + case 0: return url; + case 1: return content; + case 2: return parsedContent; + case 3: return outlinks; + case 4: return metadata; + default: throw new AvroRuntimeException("Bad index"); + } + } + @SuppressWarnings(value="unchecked") + public void put(int _field, Object _value) { + if(isFieldEqual(_field, _value)) return; + getStateManager().setDirty(this, _field); + switch (_field) { + case 0:url = (Utf8)_value; break; + case 1:content = (ByteBuffer)_value; break; + case 2:parsedContent = (GenericArray)_value; break; + case 3:outlinks = (Map)_value; break; + case 4:metadata = (Metadata)_value; break; + default: throw new AvroRuntimeException("Bad index"); + } + } + public Utf8 getUrl() { + return (Utf8) get(0); + } + public void setUrl(Utf8 value) { + put(0, value); + } + public ByteBuffer getContent() { + return (ByteBuffer) get(1); + } + public void setContent(ByteBuffer value) { + put(1, value); + } + public GenericArray getParsedContent() { + return (GenericArray) get(2); + } + public void addToParsedContent(Utf8 element) { + getStateManager().setDirty(this, 2); + parsedContent.add(element); + } + public Map getOutlinks() { + return (Map) get(3); + } + public Utf8 getFromOutlinks(Utf8 key) { + if (outlinks == null) { return null; } + return outlinks.get(key); + } + public void putToOutlinks(Utf8 key, Utf8 value) { + getStateManager().setDirty(this, 3); + outlinks.put(key, value); + } + public Utf8 removeFromOutlinks(Utf8 key) { + if (outlinks == null) { return null; } + getStateManager().setDirty(this, 3); + return outlinks.remove(key); + } + public Metadata getMetadata() { + return (Metadata) get(4); + } + public void setMetadata(Metadata value) { + put(4, value); + } +} Added: incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/QueryCounter.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/QueryCounter.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/QueryCounter.java (added) +++ incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/QueryCounter.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,133 @@ + +package org.gora.examples.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.gora.mapreduce.GoraMapper; +import org.gora.persistency.Persistent; +import org.gora.query.Query; +import org.gora.store.DataStore; +import org.gora.store.DataStoreFactory; + +/** + * Example Hadoop job to count the row of a gora {@link Query}. + */ +public class QueryCounter extends Configured implements Tool { + + public static final String COUNTER_GROUP = "QueryCounter"; + public static final String ROWS = "ROWS"; + + public QueryCounter(Configuration conf) { + setConf(conf); + } + + public static class QueryCounterMapper + extends GoraMapper { + + @Override + protected void map(K key, T value, + Context context) throws IOException ,InterruptedException { + + context.getCounter(COUNTER_GROUP, ROWS).increment(1L); + }; + } + + /** Returns the Query to count the results of. Subclasses can + * override this function to customize the query. + * @return the Query object to count the results of. + */ + public Query getQuery(DataStore dataStore) { + Query query = dataStore.newQuery(); + return query; + } + + /** + * Creates and returns the {@link Job} for submitting to Hadoop mapreduce. + * @param dataStore + * @param query + * @return + * @throws IOException + */ + public Job createJob(DataStore dataStore, Query query) throws IOException { + Job job = new Job(getConf()); + + job.setJobName("QueryCounter"); + job.setNumReduceTasks(0); + job.setJarByClass(getClass()); + /* Mappers are initialized with GoraMapper.initMapper()*/ + GoraMapper.initMapperJob(job, query, dataStore, NullWritable.class + , NullWritable.class, QueryCounterMapper.class, true); + + job.setOutputFormatClass(NullOutputFormat.class); + return job; + } + + + /** + * Returns the number of results to the Query + */ + public long countQuery(DataStore dataStore, Query query) throws Exception { + Job job = createJob(dataStore, query); + job.waitForCompletion(true); + + return job.getCounters().findCounter(COUNTER_GROUP, ROWS).getValue(); + } + + /** + * Returns the number of results to the Query obtained by the + * {@link #getQuery(DataStore)} method. + */ + public long countQuery(DataStore dataStore) throws Exception { + Query query = getQuery(dataStore); + + Job job = createJob(dataStore, query); + job.waitForCompletion(true); + + return job.getCounters().findCounter(COUNTER_GROUP, ROWS).getValue(); + } + + @SuppressWarnings("unchecked") + @Override + public int run(String[] args) throws Exception { + + if(args.length < 2) { + System.err.println("Usage QueryCounter [dataStoreClass]"); + return 1; + } + + Class keyClass = (Class) Class.forName(args[0]); + Class persistentClass = (Class) Class.forName(args[1]); + + DataStore dataStore; + + if(args.length > 2) { + Class> dataStoreClass + = (Class>) Class.forName(args[2]); + dataStore = DataStoreFactory.getDataStore(dataStoreClass, keyClass, persistentClass); + } + else { + dataStore = DataStoreFactory.getDataStore(keyClass, persistentClass); + } + + long results = countQuery(dataStore); + + System.out.println("Number of result to the query:" + results); + + return 0; + } + + + @SuppressWarnings("rawtypes") + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new QueryCounter(new Configuration()), args); + System.exit(ret); + } +} Added: incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/WordCount.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/WordCount.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/WordCount.java (added) +++ incubator/gora/trunk/gora-core/src/examples/java/org/gora/examples/mapreduce/WordCount.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,152 @@ + +package org.gora.examples.mapreduce; + +import java.io.IOException; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.gora.examples.generated.TokenDatum; +import org.gora.examples.generated.WebPage; +import org.gora.mapreduce.GoraMapper; +import org.gora.mapreduce.GoraReducer; +import org.gora.query.Query; +import org.gora.store.DataStore; +import org.gora.store.DataStoreFactory; + +/** + * Classic word count example in Gora. + */ +public class WordCount extends Configured implements Tool { + + public WordCount() { + + } + + public WordCount(Configuration conf) { + setConf(conf); + } + + /** + * TokenizerMapper takes <String, WebPage> pairs as obtained + * from the input DataStore, and tokenizes the content via + * {@link WebPage#getContent()}. The tokens are emitted as + * <String, WebPage> pairs. + */ + public static class TokenizerMapper + extends GoraMapper { + + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + @Override + protected void map(String key, WebPage page, Context context) + throws IOException ,InterruptedException { + + //Get the content from a WebPage as obtained from the DataStore + String content = new String(page.getContent().array()); + + StringTokenizer itr = new StringTokenizer(content); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + context.write(word, one); + } + }; + } + + public static class WordCountReducer extends GoraReducer { + + TokenDatum result = new TokenDatum(); + + @Override + protected void reduce(Text key, Iterable values, Context context) + throws IOException ,InterruptedException { + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.setCount(sum); + context.write(key.toString(), result); + }; + + } + + /** + * Creates and returns the {@link Job} for submitting to Hadoop mapreduce. + * @param inStore + * @param query + * @return + * @throws IOException + */ + public Job createJob(DataStore inStore, Query query + , DataStore outStore) throws IOException { + Job job = new Job(getConf()); + + job.setJobName("WordCount"); + + job.setNumReduceTasks(10); + job.setJarByClass(getClass()); + + /* Mappers are initialized with GoraMapper#initMapper(). + * Instead of the TokenizerMapper defined here, if the input is not + * obtained via Gora, any other mapper can be used, such as + * Hadoop-MapReduce's WordCount.TokenizerMapper. + */ + GoraMapper.initMapperJob(job, query, inStore, Text.class + , IntWritable.class, TokenizerMapper.class, true); + + /* Reducers are initialized with GoraReducer#initReducer(). + * If the output is not to be persisted via Gora, any reducer + * can be used instead. + */ + GoraReducer.initReducerJob(job, outStore, WordCountReducer.class); + + //TODO: set combiner + + return job; + } + + public int wordCount(DataStore inStore, + DataStore outStore) throws IOException, InterruptedException, ClassNotFoundException { + Query query = inStore.newQuery(); + + Job job = createJob(inStore, query, outStore); + return job.waitForCompletion(true) ? 0 : 1; + } + + @Override + public int run(String[] args) throws Exception { + + DataStore inStore; + DataStore outStore; + + if(args.length > 0) { + String dataStoreClass = args[0]; + inStore = DataStoreFactory.getDataStore(dataStoreClass, + String.class, WebPage.class); + if(args.length > 1) { + dataStoreClass = args[1]; + } + outStore = DataStoreFactory.getDataStore(dataStoreClass, + String.class, TokenDatum.class); + } else { + inStore = DataStoreFactory.getDataStore(String.class, WebPage.class); + outStore = DataStoreFactory.getDataStore(String.class, TokenDatum.class); + } + + return wordCount(inStore, outStore); + } + + // Usage WordCount [ [output datastore class]] + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new WordCount(), args); + System.exit(ret); + } + +} Added: incubator/gora/trunk/gora-core/src/main/java/org/gora/avro/PersistentDatumReader.java URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/gora/avro/PersistentDatumReader.java?rev=1006024&view=auto ============================================================================== --- incubator/gora/trunk/gora-core/src/main/java/org/gora/avro/PersistentDatumReader.java (added) +++ incubator/gora/trunk/gora-core/src/main/java/org/gora/avro/PersistentDatumReader.java Fri Oct 8 21:17:10 2010 @@ -0,0 +1,232 @@ + +package org.gora.avro; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.WeakHashMap; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericArray; +import org.apache.avro.generic.GenericData; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.ResolvingDecoder; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.util.Utf8; +import org.gora.mapreduce.FakeResolvingDecoder; +import org.gora.persistency.ListGenericArray; +import org.gora.persistency.Persistent; +import org.gora.persistency.State; +import org.gora.persistency.StatefulHashMap; +import org.gora.persistency.StatefulMap; +import org.gora.persistency.impl.StateManagerImpl; +import org.gora.util.IOUtils; + +/** + * PersistentDatumReader reads, fields' dirty and readable information. + */ +public class PersistentDatumReader + extends SpecificDatumReader { + + private Schema rootSchema; + private T cachedPersistent; // for creating objects + + private WeakHashMap decoderCache + = new WeakHashMap(); + + private boolean readDirtyBits = true; + + public PersistentDatumReader() { + } + + public PersistentDatumReader(Schema schema, boolean readDirtyBits) { + this.readDirtyBits = readDirtyBits; + setSchema(schema); + } + + @Override + public void setSchema(Schema actual) { + this.rootSchema = actual; + super.setSchema(actual); + } + + @SuppressWarnings("unchecked") + public T newPersistent() { + if(cachedPersistent == null) { + cachedPersistent = (T)super.newRecord(null, rootSchema); + return cachedPersistent; //we can return the cached object + } + return (T)cachedPersistent.newInstance(new StateManagerImpl()); + } + + @Override + protected Object newRecord(Object old, Schema schema) { + if(old != null) { + return old; + } + + if(schema.equals(rootSchema)) { + return newPersistent(); + } else { + return super.newRecord(old, schema); + } + } + + @Override + @SuppressWarnings("unchecked") + public T read(T reuse, Decoder in) throws IOException { + return (T) read(reuse, rootSchema, in); + } + + public Object read(Object reuse, Schema schema, Decoder decoder) + throws IOException { + return super.read(reuse, schema, getResolvingDecoder(decoder)); + } + + protected ResolvingDecoder getResolvingDecoder(Decoder decoder) + throws IOException { + ResolvingDecoder resolvingDecoder = decoderCache.get(decoder); + if(resolvingDecoder == null) { + resolvingDecoder = new FakeResolvingDecoder(rootSchema, decoder); + decoderCache.put(decoder, resolvingDecoder); + } + return resolvingDecoder; + } + + @Override + @SuppressWarnings("unchecked") + protected Object readRecord(Object old, Schema expected, ResolvingDecoder in) + throws IOException { + + Object record = newRecord(old, expected); + + //check if top-level + if(expected.equals(rootSchema) && readDirtyBits) { + T persistent = (T)record; + persistent.clear(); + + boolean[] dirtyFields = IOUtils.readBoolArray(in); + boolean[] readableFields = IOUtils.readBoolArray(in); + + //read fields + int i = 0; + + for (Field f : expected.getFields()) { + if(readableFields[f.pos()]) { + int pos = f.pos(); + String name = f.name(); + Object oldDatum = (old != null) ? getField(record, name, pos) : null; + setField(record, name, pos, read(oldDatum, f.schema(), in)); + } + } + + // Now set changed bits + for (i = 0; i < dirtyFields.length; i++) { + if (dirtyFields[i]) { + persistent.setDirty(i); + } + } + return record; + } else { + //since ResolvingDecoder.readFieldOrder is final, we cannot override it + //so this is a copy of super.readReacord, with the readFieldOrder change + + for (Field f : expected.getFields()) { + int pos = f.pos(); + String name = f.name(); + Object oldDatum = (old != null) ? getField(record, name, pos) : null; + setField(record, name, pos, read(oldDatum, f.schema(), in)); + } + + return record; + } + } + + @Override + @SuppressWarnings("unchecked") + protected Object readMap(Object old, Schema expected, ResolvingDecoder in) + throws IOException { + + StatefulMap map = (StatefulMap) newMap(old, 0); + map.clearStates(); + if (readDirtyBits) { + int size = in.readInt(); + for (int j = 0; j < size; j++) { + Utf8 key = in.readString(null); + State state = State.values()[in.readInt()]; + map.putState(key, state); + } + } + return super.readMap(map, expected, in); + } + + @Override + @SuppressWarnings({ "rawtypes" }) + protected Object newMap(Object old, int size) { + if (old instanceof StatefulHashMap) { + ((Map) old).clear(); + ((StatefulHashMap)old).clearStates(); + return old; + } + return new StatefulHashMap(); + } + + /** Called to create new array instances. Subclasses may override to use a + * different array implementation. By default, this returns a {@link + * GenericData.Array}.*/ + @Override + @SuppressWarnings("rawtypes") + protected Object newArray(Object old, int size, Schema schema) { + if (old instanceof ListGenericArray) { + ((GenericArray) old).clear(); + return old; + } else return new ListGenericArray(size, schema); + } + + public Persistent clone(Persistent persistent, Schema schema) { + Persistent cloned = persistent.newInstance(new StateManagerImpl()); + List fields = schema.getFields(); + for(Field field: fields) { + int pos = field.pos(); + switch(field.schema().getType()) { + case MAP : + case ARRAY : + case RECORD : + case STRING : cloned.put(pos, cloneObject( + field.schema(), persistent.get(pos), cloned.get(pos))); break; + case NULL : break; + default : cloned.put(pos, persistent.get(pos)); break; + } + } + + return cloned; + } + + @SuppressWarnings("unchecked") + protected Object cloneObject(Schema schema, Object toClone, Object cloned) { + if(toClone == null) { + return null; + } + + switch(schema.getType()) { + case MAP : + Map map = (Map)newMap(cloned, 0); + for(Map.Entry entry: ((Map)toClone).entrySet()) { + map.put((Utf8)createString(entry.getKey().toString()) + , cloneObject(schema.getValueType(), entry.getValue(), null)); + } + return map; + case ARRAY : + GenericArray array = (GenericArray) + newArray(cloned, (int)((GenericArray)toClone).size(), schema); + for(Object element: (GenericArray)toClone) { + array.add(cloneObject(schema.getElementType(), element, null)); + } + return array; + case RECORD : return clone((Persistent)toClone, schema); + case STRING : return createString(toClone.toString()); + default : return toClone; //shallow copy is enough + } + } +}