incubator-gora-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Henry Saputra <henry.sapu...@gmail.com>
Subject Re: svn commit: r1149420 - in /incubator/gora/trunk: gora-cassandra/ivy/ gora-cassandra/lib-ext/ gora-cassandra/src/main/java/org/apache/gora/cassandra/client/ gora-cassandra/src/main/java/org/apache/gora/cassandra/query/ gora-cassandra/src/main/java
Date Fri, 22 Jul 2011 00:39:16 GMT
Looks like some files missing Apache license header.

- Henry

On Thu, Jul 21, 2011 at 5:34 PM,  <alexis@apache.org> wrote:
> Author: alexis
> Date: Fri Jul 22 00:33:59 2011
> New Revision: 1149420
>
> URL: http://svn.apache.org/viewvc?rev=1149420&view=rev
> Log:
> Cassandra 0.8 backend witch Hector client
>
> Added:
>    incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar   (with props)
>    incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar   (with props)
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
> Removed:
>    incubator/gora/trunk/gora-cassandra/lib-ext/apache-cassandra-0.6.4.jar
>    incubator/gora/trunk/gora-cassandra/lib-ext/libthrift-r917130.jar
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/client/
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraColumn.java
> Modified:
>    incubator/gora/trunk/gora-cassandra/ivy/ivy.xml
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
>    incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
>    incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
>
> Modified: incubator/gora/trunk/gora-cassandra/ivy/ivy.xml
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/ivy/ivy.xml?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/ivy/ivy.xml (original)
> +++ incubator/gora/trunk/gora-cassandra/ivy/ivy.xml Fri Jul 22 00:33:59 2011
> @@ -22,7 +22,7 @@
>       organisation="org.apache.gora"
>       module="gora-cassandra"
>       status="integration"/>
> -
> +
>   <configurations>
>     <include file="${project.dir}/ivy/ivy-configurations.xml"/>
>   </configurations>
> @@ -32,15 +32,25 @@
>     <artifact name="gora-cassandra-test" conf="test"/>
>   </publications>
>
> +
>   <dependencies>
>     <!-- conf="*->@" means every conf is mapped to the conf of the same name of the artifact-->
> -    <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
> -    <dependency org="org.jdom" name="jdom" rev="1.1" conf="*->master"/>
> -
> -    <dependency org="org.slf4j" name="slf4j-jdk14" rev="1.5.8" conf="*->master"/>
> -    <dependency org="org.slf4j" name="slf4j-api" rev="1.5.8" conf="*->master"/>
> -
> -    <dependency org="com.google.guava" name="guava" rev="r06"/>
> +
> +    <dependency org="org.apache.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/>
> +
> +    <dependency org="org.jdom" name="jdom" rev="1.1">
> +       <exclude org="xerces" name="xercesImpl"/>
> +    </dependency>
> +
> +    <!--
> +        <dependency org="org.apache.cassandra" name="apache-cassandra" rev="0.8.1"/>
> +       <dependency org="me.prettyprint" name="hector" rev="0.8.0-1"/>
> +    -->
> +    <dependency org="org.apache.cassandra" name="cassandra-thrift" rev="0.8.1"/>
> +    <dependency org="com.ecyrd.speed4j" name="speed4j" rev="0.9" conf="*->*,!javadoc,!sources"/>
> +    <dependency org="com.github.stephenc.high-scale-lib" name="high-scale-lib" rev="1.1.2" conf="*->*,!javadoc,!sources"/>
> +    <dependency org="com.google.collections" name="google-collections" rev="1.0" conf="*->*,!javadoc,!sources"/>
> +    <dependency org="com.google.guava" name="guava" rev="r09" conf="*->*,!javadoc,!sources"/>
>
>     <!-- test dependencies -->
>
>
> Added: incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar?rev=1149420&view=auto
> ==============================================================================
> Binary file - no diff available.
>
> Propchange: incubator/gora/trunk/gora-cassandra/lib-ext/cassandra-all-0.8.0.jar
> ------------------------------------------------------------------------------
>    svn:mime-type = application/octet-stream
>
> Added: incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar?rev=1149420&view=auto
> ==============================================================================
> Binary file - no diff available.
>
> Propchange: incubator/gora/trunk/gora-cassandra/lib-ext/hector-core-0.8.0-1.jar
> ------------------------------------------------------------------------------
>    svn:mime-type = application/octet-stream
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,41 @@
> +package org.apache.gora.cassandra.query;
> +
> +import org.apache.avro.Schema.Field;
> +
> +
> +/**
> + * Represents a unit of data: a key value pair tagged by a family name
> + */
> +public abstract class CassandraColumn {
> +  public static final int SUB = 0;
> +  public static final int SUPER = 1;
> +
> +  private String family;
> +  private int type;
> +  private Field field;
> +
> +  public String getFamily() {
> +    return family;
> +  }
> +  public void setFamily(String family) {
> +    this.family = family;
> +  }
> +  public int getType() {
> +    return type;
> +  }
> +  public void setType(int type) {
> +    this.type = type;
> +  }
> +  public void setField(Field field) {
> +    this.field = field;
> +  }
> +
> +  protected Field getField() {
> +    return this.field;
> +  }
> +
> +  public abstract String getName();
> +  public abstract Object getValue();
> +
> +
> +}
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraPartitionQuery.java Fri Jul 22 00:33:59 2011
> @@ -1,93 +0,0 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *     http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -package org.apache.gora.cassandra.query;
> -
> -import java.io.DataInput;
> -import java.io.DataOutput;
> -import java.io.IOException;
> -
> -import org.apache.gora.persistency.Persistent;
> -import org.apache.gora.query.Query;
> -import org.apache.gora.query.impl.PartitionQueryImpl;
> -import org.apache.hadoop.io.Text;
> -
> -public class CassandraPartitionQuery<K, T extends Persistent>
> -extends PartitionQueryImpl<K, T> {
> -
> -  private String startToken;
> -
> -  private String endToken;
> -
> -  private String[] endPoints;
> -
> -  private int splitSize;
> -
> -  public CassandraPartitionQuery() {
> -    this.dataStore = null;
> -  }
> -
> -  public CassandraPartitionQuery(Query<K, T> baseQuery, String startToken, String endToken, String[] endPoints,
> -      int splitSize) {
> -    super(baseQuery);
> -    this.startToken = startToken;
> -    this.endToken = endToken;
> -    this.endPoints = endPoints;
> -    this.splitSize = splitSize;
> -  }
> -
> -  @Override
> -  public void write(DataOutput out) throws IOException {
> -    super.write(out);
> -    Text.writeString(out, startToken);
> -    Text.writeString(out, endToken);
> -    out.writeInt(endPoints.length);
> -    for (String endPoint : endPoints) {
> -      Text.writeString(out, endPoint);
> -    }
> -    out.writeInt(splitSize);
> -  }
> -
> -  @Override
> -  public void readFields(DataInput in) throws IOException {
> -    super.readFields(in);
> -    startToken = Text.readString(in);
> -    endToken = Text.readString(in);
> -    int size = in.readInt();
> -    endPoints = new String[size];
> -    for (int i = 0; i < size; i++) {
> -      endPoints[i] = Text.readString(in);
> -    }
> -    splitSize = in.readInt();
> -  }
> -
> -  public String getStartToken() {
> -    return startToken;
> -  }
> -
> -  public String getEndToken() {
> -    return endToken;
> -  }
> -
> -  public String[] getEndPoints() {
> -    return endPoints;
> -  }
> -
> -  public int getSplitSize() {
> -    return splitSize;
> -  }
> -}
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java Fri Jul 22 00:33:59 2011
> @@ -1,34 +1,55 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *     http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
>  package org.apache.gora.cassandra.query;
>
> +import java.util.List;
> +import java.util.Map;
> +
>  import org.apache.gora.persistency.Persistent;
> +import org.apache.gora.query.Query;
>  import org.apache.gora.query.impl.QueryBase;
>  import org.apache.gora.store.DataStore;
>
> -public class CassandraQuery<K, T extends Persistent>
> -extends QueryBase<K, T> {
> +public class CassandraQuery<K, T extends Persistent> extends QueryBase<K, T> {
>
> +  private Query<K, T> query;
> +
> +  /**
> +   * Maps Avro fields to Cassandra columns.
> +   */
> +  private Map<String, List<String>> familyMap;
> +
>   public CassandraQuery() {
>     super(null);
>   }
> -
>   public CassandraQuery(DataStore<K, T> dataStore) {
>     super(dataStore);
>   }
> +  public void setFamilyMap(Map<String, List<String>> familyMap) {
> +    this.familyMap = familyMap;
> +  }
> +  public Map<String, List<String>> getFamilyMap() {
> +    return familyMap;
> +  }
> +
> +  /**
> +   * @param family the family name
> +   * @return an array of the query column names belonging to the family
> +   */
> +  public String[] getColumns(String family) {
> +
> +    List<String> columnList = familyMap.get(family);
> +    String[] columns = new String[columnList.size()];
> +    for (int i = 0; i < columns.length; ++i) {
> +      columns[i] = columnList.get(i);
> +    }
> +    return columns;
> +  }
> +  public Query<K, T> getQuery() {
> +    return query;
> +  }
> +  public void setQuery(Query<K, T> query) {
> +    this.query = query;
> +  }
> +
> +
> +
>  }
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java Fri Jul 22 00:33:59 2011
> @@ -1,157 +1,97 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *     http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
>  package org.apache.gora.cassandra.query;
>
>  import java.io.IOException;
> -import java.net.InetAddress;
> -import java.net.UnknownHostException;
> -import java.util.Iterator;
> -
> -import org.apache.gora.cassandra.client.CassandraClient;
> -import org.apache.gora.cassandra.client.Row;
> -import org.apache.gora.cassandra.client.Select;
> -import org.apache.gora.cassandra.store.CassandraStore;
> +import java.util.List;
> +import java.util.Map;
> +
> +import org.apache.avro.Schema;
> +import org.apache.avro.Schema.Field;
>  import org.apache.gora.persistency.Persistent;
>  import org.apache.gora.query.Query;
>  import org.apache.gora.query.impl.ResultBase;
>  import org.apache.gora.store.DataStore;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
> -public class CassandraResult<K, T extends Persistent>
> -extends ResultBase<K, T> {
> -
> -  private Iterator<Row> rowIter;
> -
> -  private CassandraStore<K, T> store;
> +public class CassandraResult<K, T extends Persistent> extends ResultBase<K, T> {
> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraResult.class);
> +
> +  private int rowNumber;
> +
> +  private CassandraResultSet cassandraResultSet;
> +
> +  /**
> +   * Maps Cassandra columns to Avro fields.
> +   */
> +  private Map<String, String> reverseMap;
>
> -  private String[] fields;
> -
> -  public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query,
> -      int batchRowCount) throws IOException {
> +  public CassandraResult(DataStore<K, T> dataStore, Query<K, T> query) {
>     super(dataStore, query);
> -
> -    store = (CassandraStore<K, T>) dataStore;
> -    fields = query.getFields();
> -
> -    boolean isUsingTokens = (query instanceof CassandraPartitionQuery);
> -    String startTokenOrKey;
> -    String endTokenOrKey;
> -
> -    if (isUsingTokens) {
> -      CassandraPartitionQuery<K, T> partitionQuery = (CassandraPartitionQuery<K, T>) query;
> -      startTokenOrKey = partitionQuery.getStartToken();
> -      endTokenOrKey = partitionQuery.getEndToken();
> -    } else {
> -      CassandraQuery<K, T> cassandraQuery = (CassandraQuery<K, T>) query;
> -      startTokenOrKey = cassandraQuery.getStartKey().toString();
> -      endTokenOrKey = cassandraQuery.getEndKey().toString();
> -    }
> -
> -    Select select = store.createSelect(fields);
> -
> -    CassandraClient client = store.getClientByLocation(getLocation(query));
> -    if (isUsingTokens) {
> -      rowIter =
> -        client.getTokenRange(startTokenOrKey, endTokenOrKey,
> -            batchRowCount, select).iterator();
> -    } else {
> -      rowIter = client.getRange(startTokenOrKey, endTokenOrKey,
> -          batchRowCount, select).iterator();
> -    }
> -  }
> -
> -  @Override
> -  public float getProgress() throws IOException {
> -    return 0;
>   }
>
>   @Override
>   protected boolean nextInner() throws IOException {
> -    if (!rowIter.hasNext()) {
> -      return false;
> -    }
> -    Row row = rowIter.next();
> -    if (row == null) {
> -      return false;
> +    if (this.rowNumber < this.cassandraResultSet.size()) {
> +      updatePersistent();
>     }
> -
> -    key = toKey(row.getKey());
> -    persistent = store.newInstance(row, fields);
> -
> -    return true;
> +    ++this.rowNumber;
> +    return (this.rowNumber <= this.cassandraResultSet.size());
>   }
>
> +
> +  /**
> +   * Load key/value pair from Cassandra row to Avro record.
> +   * @throws IOException
> +   */
>   @SuppressWarnings("unchecked")
> -  private K toKey(String keyStr) {
> -    Class<K> keyClass = dataStore.getKeyClass();
> -    if (keyClass.isAssignableFrom(String.class)) {
> -      return (K) keyStr;
> -    }
> -    if (keyClass.isAssignableFrom(Integer.class)) {
> -      return (K) (Integer) Integer.parseInt(keyStr);
> -    }
> -    if (keyClass.isAssignableFrom(Float.class)) {
> -      return (K) (Float) Float.parseFloat(keyStr);
> -    }
> -    if (keyClass.isAssignableFrom(Double.class)) {
> -      return (K) (Double) Double.parseDouble(keyStr);
> -    }
> -    if (keyClass.isAssignableFrom(Long.class)) {
> -      return (K) (Long) Long.parseLong(keyStr);
> -    }
> -    if (keyClass.isAssignableFrom(Short.class)) {
> -      return (K) (Short) Short.parseShort(keyStr);
> -    }
> -    if (keyClass.isAssignableFrom(Byte.class)) {
> -      return (K) (Byte) Byte.parseByte(keyStr);
> +  private void updatePersistent() throws IOException {
> +    CassandraRow cassandraRow = this.cassandraResultSet.get(this.rowNumber);
> +
> +    // load key
> +    this.key = (K) cassandraRow.getKey();
> +
> +    // load value
> +    Schema schema = this.persistent.getSchema();
> +    List<Field> fields = schema.getFields();
> +
> +    for (CassandraColumn cassandraColumn: cassandraRow) {
> +
> +      // get field name
> +      String family = cassandraColumn.getFamily();
> +      String fieldName = this.reverseMap.get(family + ":" + cassandraColumn.getName());
> +
> +      // get field
> +      int pos = this.persistent.getFieldIndex(fieldName);
> +      Field field = fields.get(pos);
> +
> +      // get value
> +      cassandraColumn.setField(field);
> +      Object value = cassandraColumn.getValue();
> +
> +      this.persistent.put(pos, value);
> +      // this field does not need to be written back to the store
> +      this.persistent.clearDirty(pos);
>     }
>
> -    throw new RuntimeException("Can't parse " + keyStr +
> -                               " as an instance of " + keyClass);
>   }
>
>   @Override
> -  public void close() throws IOException { }
> +  public void close() throws IOException {
> +    // TODO Auto-generated method stub
> +
> +  }
>
> -  private String getLocation(Query<K, T> query) {
> -    if (!(query instanceof CassandraPartitionQuery)) {
> -      return null;
> -    }
> -    CassandraPartitionQuery<K, T> partitonQuery =
> -      (CassandraPartitionQuery<K, T>) query;
> -    InetAddress[] localAddresses = new InetAddress[0];
> -    try {
> -      localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
> -    } catch (UnknownHostException e) {
> -      throw new AssertionError(e);
> -    }
> -    for (InetAddress address : localAddresses) {
> -      for (String location : partitonQuery.getEndPoints()) {
> -        InetAddress locationAddress = null;
> -        try {
> -          locationAddress = InetAddress.getByName(location);
> -        } catch (UnknownHostException e) {
> -          throw new AssertionError(e);
> -        }
> -        if (address.equals(locationAddress)) {
> -          return location;
> -        }
> -      }
> -    }
> -    return partitonQuery.getEndPoints()[0];
> +  @Override
> +  public float getProgress() throws IOException {
> +    return (((float) this.rowNumber) / this.cassandraResultSet.size());
>   }
> -}
> \ No newline at end of file
> +
> +  public void setResultSet(CassandraResultSet cassandraResultSet) {
> +    this.cassandraResultSet = cassandraResultSet;
> +  }
> +
> +  public void setReverseMap(Map<String, String> reverseMap) {
> +    this.reverseMap = reverseMap;
> +  }
> +
> +}
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,36 @@
> +package org.apache.gora.cassandra.query;
> +
> +import java.util.ArrayList;
> +import java.util.HashMap;
> +
> +/**
> + * List data structure to keep the order coming from the Cassandra selects.
> + */
> +public class CassandraResultSet extends ArrayList<CassandraRow> {
> +
> +  /**
> +   *
> +   */
> +  private static final long serialVersionUID = -7620939600192859652L;
> +
> +  /**
> +   * Maps keys to indices in the list.
> +   */
> +  private HashMap<String, Integer> indexMap = new HashMap<String, Integer>();
> +
> +  public CassandraRow getRow(String key) {
> +    Integer integer = this.indexMap.get(key);
> +    if (integer == null) {
> +      return null;
> +    }
> +
> +    return this.get(integer);
> +  }
> +
> +  public void putRow(String key, CassandraRow cassandraRow) {
> +    this.add(cassandraRow);
> +    this.indexMap.put(key, this.size()-1);
> +  }
> +
> +
> +}
> \ No newline at end of file
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,24 @@
> +package org.apache.gora.cassandra.query;
> +
> +import java.util.ArrayList;
> +
> +/**
> + * List of key value pairs representing a row, tagged by a key.
> + */
> +public class CassandraRow extends ArrayList<CassandraColumn> {
> +
> +  /**
> +   *
> +   */
> +  private static final long serialVersionUID = -7620939600192859652L;
> +  private String key;
> +
> +  public String getKey() {
> +    return this.key;
> +  }
> +
> +  public void setKey(String key) {
> +    this.key = key;
> +  }
> +
> +}
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,104 @@
> +package org.apache.gora.cassandra.query;
> +
> +import java.nio.ByteBuffer;
> +import java.nio.CharBuffer;
> +import java.nio.charset.CharacterCodingException;
> +import java.nio.charset.Charset;
> +import java.nio.charset.CharsetEncoder;
> +
> +import me.prettyprint.hector.api.beans.HColumn;
> +
> +import org.apache.avro.Schema;
> +import org.apache.avro.Schema.Field;
> +import org.apache.avro.Schema.Type;
> +import org.apache.avro.generic.GenericArray;
> +import org.apache.avro.generic.GenericData;
> +import org.apache.avro.util.Utf8;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +public class CassandraSubColumn extends CassandraColumn {
> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraSubColumn.class);
> +
> +  private static final String ENCODING = "UTF-8";
> +
> +  private static CharsetEncoder charsetEncoder = Charset.forName(ENCODING).newEncoder();;
> +
> +
> +  /**
> +   * Key-value pair containing the raw data.
> +   */
> +  private HColumn<String, String> hColumn;
> +
> +  public String getName() {
> +    return hColumn.getName();
> +  }
> +
> +  /**
> +   * Deserialize a String into an typed Object, according to the field schema.
> +   * @see org.apache.gora.cassandra.query.CassandraColumn#getValue()
> +   */
> +  public Object getValue() {
> +    Field field = getField();
> +    Schema fieldSchema = field.schema();
> +    Type type = fieldSchema.getType();
> +    String valueString = hColumn.getValue();
> +    Object value = null;
> +
> +    switch (type) {
> +      case STRING:
> +        value = new Utf8(valueString);
> +        break;
> +      case BYTES:
> +        // convert string to bytebuffer
> +        value = getByteBuffer(valueString);
> +        break;
> +      case INT:
> +        value = Integer.parseInt(valueString);
> +        break;
> +      case LONG:
> +        value = Long.parseLong(valueString);
> +        break;
> +      case FLOAT:
> +        value = Float.parseFloat(valueString);
> +        break;
> +      case ARRAY:
> +        // convert string to array
> +        valueString = valueString.substring(1, valueString.length()-1);
> +        String[] elements = valueString.split(", ");
> +
> +        Type elementType = fieldSchema.getElementType().getType();
> +        if (elementType == Schema.Type.STRING) {
> +          // the array type is String
> +          GenericArray<String> genericArray = new GenericData.Array<String>(elements.length, Schema.createArray(Schema.create(Schema.Type.STRING)));
> +          for (String element: elements) {
> +            genericArray.add(element);
> +          }
> +
> +          value = genericArray;
> +        } else {
> +          LOG.info("Element type not supported: " + elementType);
> +        }
> +        break;
> +      default:
> +        LOG.info("Type not supported: " + type);
> +    }
> +
> +    return value;
> +
> +  }
> +
> +  public void setValue(HColumn<String, String> hColumn) {
> +    this.hColumn = hColumn;
> +  }
> +
> +  public static ByteBuffer getByteBuffer(String valueString) {
> +    ByteBuffer byteBuffer = null;
> +    try {
> +      byteBuffer = charsetEncoder.encode(CharBuffer.wrap(valueString));
> +    } catch (CharacterCodingException cce) {
> +      LOG.warn("Unable to encode " + valueString + " into " + ENCODING);
> +    }
> +    return byteBuffer;
> +  }
> +}
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,102 @@
> +package org.apache.gora.cassandra.query;
> +
> +import java.util.Map;
> +
> +import me.prettyprint.hector.api.beans.HColumn;
> +import me.prettyprint.hector.api.beans.HSuperColumn;
> +
> +import org.apache.avro.Schema;
> +import org.apache.avro.Schema.Field;
> +import org.apache.avro.Schema.Type;
> +import org.apache.avro.util.Utf8;
> +import org.apache.gora.persistency.StatefulHashMap;
> +import org.apache.gora.persistency.impl.PersistentBase;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +public class CassandraSuperColumn extends CassandraColumn {
> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraSuperColumn.class);
> +
> +  private HSuperColumn<String, String, String> hSuperColumn;
> +
> +  public String getName() {
> +    return hSuperColumn.getName();
> +  }
> +
> +  public Object getValue() {
> +    Field field = getField();
> +    Schema fieldSchema = field.schema();
> +    Type type = fieldSchema.getType();
> +
> +    Object value = null;
> +
> +    switch (type) {
> +      case MAP:
> +        Map<Utf8, Object> map = new StatefulHashMap<Utf8, Object>();
> +        Type valueType = fieldSchema.getValueType().getType();
> +
> +        for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
> +          String memberString = hColumn.getValue();
> +          Object memberValue = null;
> +          switch (valueType) {
> +            case STRING:
> +              memberValue = new Utf8(memberString);
> +              break;
> +            case BYTES:
> +              memberValue = CassandraSubColumn.getByteBuffer(memberString);
> +              break;
> +            default:
> +              LOG.info("Type for the map value is not supported: " + valueType);
> +
> +          }
> +          map.put(new Utf8(hColumn.getName()), memberValue);
> +        }
> +        value = map;
> +
> +        break;
> +      case RECORD:
> +        String fullName = fieldSchema.getFullName();
> +
> +        Class<?> claz = null;
> +        try {
> +          claz = Class.forName(fullName);
> +        } catch (ClassNotFoundException cnfe) {
> +          LOG.warn("Unable to load class " + fullName, cnfe);
> +          break;
> +        }
> +
> +        try {
> +          value = claz.newInstance();
> +        } catch (InstantiationException ie) {
> +          LOG.warn("Instantiation error", ie);
> +          break;
> +        } catch (IllegalAccessException iae) {
> +          LOG.warn("Illegal access error", iae);
> +          break;
> +        }
> +
> +        // we updated the value instance, now update its members
> +        if (value instanceof PersistentBase) {
> +          PersistentBase record = (PersistentBase) value;
> +
> +          for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns()) {
> +            Field memberField = fieldSchema.getField(hColumn.getName());
> +            CassandraSubColumn cassandraColumn = new CassandraSubColumn();
> +            cassandraColumn.setField(memberField);
> +            cassandraColumn.setValue(hColumn);
> +            record.put(record.getFieldIndex(hColumn.getName()), cassandraColumn.getValue());
> +          }
> +        }
> +        break;
> +      default:
> +        LOG.info("Type not supported: " + type);
> +    }
> +
> +    return value;
> +  }
> +
> +  public void setValue(HSuperColumn<String, String, String> hSuperColumn) {
> +    this.hSuperColumn = hSuperColumn;
> +  }
> +
> +}
>
> Added: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1149420&view=auto
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (added)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Fri Jul 22 00:33:59 2011
> @@ -0,0 +1,253 @@
> +package org.apache.gora.cassandra.store;
> +
> +import java.nio.ByteBuffer;
> +import java.util.ArrayList;
> +import java.util.Arrays;
> +import java.util.HashMap;
> +import java.util.List;
> +import java.util.Map;
> +
> +import me.prettyprint.cassandra.serializers.StringSerializer;
> +import me.prettyprint.cassandra.service.CassandraHostConfigurator;
> +import me.prettyprint.hector.api.Cluster;
> +import me.prettyprint.hector.api.Keyspace;
> +import me.prettyprint.hector.api.Serializer;
> +import me.prettyprint.hector.api.beans.OrderedRows;
> +import me.prettyprint.hector.api.beans.OrderedSuperRows;
> +import me.prettyprint.hector.api.beans.Row;
> +import me.prettyprint.hector.api.beans.SuperRow;
> +import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
> +import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
> +import me.prettyprint.hector.api.factory.HFactory;
> +import me.prettyprint.hector.api.mutation.Mutator;
> +import me.prettyprint.hector.api.query.QueryResult;
> +import me.prettyprint.hector.api.query.RangeSlicesQuery;
> +import me.prettyprint.hector.api.query.RangeSuperSlicesQuery;
> +
> +import org.apache.gora.cassandra.query.CassandraQuery;
> +import org.apache.gora.mapreduce.GoraRecordReader;
> +import org.apache.gora.persistency.Persistent;
> +import org.apache.gora.query.Query;
> +import org.apache.gora.util.ByteUtils;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +
> +public class CassandraClient<K, T extends Persistent> {
> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
> +
> +  private Cluster cluster;
> +  private Keyspace keyspace;
> +  private Mutator<String> mutator;
> +
> +  private CassandraMapping cassandraMapping = new CassandraMapping();
> +
> +  private Serializer<String> stringSerializer = new StringSerializer();
> +
> +  public void init() throws Exception {
> +    this.cassandraMapping.loadConfiguration();
> +    this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new CassandraHostConfigurator(this.cassandraMapping.getHostName()));
> +
> +    // add keyspace to cluster
> +    checkKeyspace();
> +
> +    // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families.
> +    this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
> +
> +    this.mutator = HFactory.createMutator(this.keyspace, this.stringSerializer);
> +  }
> +
> +  /**
> +   * Check if keyspace already exists. If not, create it.
> +   */
> +  public void checkKeyspace() {
> +    // "describe keyspace <keyspaceName>;" query
> +    KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
> +    if (keyspaceDefinition == null) {
> +      List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions();
> +      keyspaceDefinition = HFactory.createKeyspaceDefinition(this.cassandraMapping.getKeyspaceName(), "org.apache.cassandra.locator.SimpleStrategy", 1, columnFamilyDefinitions);
> +      this.cluster.addKeyspace(keyspaceDefinition);
> +      LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'");
> +
> +      keyspaceDefinition = null;
> +    }
> +
> +
> +  }
> +
> +  /**
> +   * Drop keyspace.
> +   */
> +  public void dropKeyspace() {
> +    // "drop keyspace <keyspaceName>;" query
> +    this.cluster.dropKeyspace(this.cassandraMapping.getKeyspaceName());
> +  }
> +
> +  /**
> +   * Insert a field in a column.
> +   * @param key the row key
> +   * @param fieldName the field name
> +   * @param value the field value.
> +   */
> +  public void addColumn(String key, String fieldName, Object value) {
> +    if (value == null) {
> +      return;
> +    }
> +    if (value instanceof ByteBuffer) {
> +      value = toString((ByteBuffer) value);
> +    }
> +
> +    String columnFamily = this.cassandraMapping.getFamily(fieldName);
> +    String columnName = this.cassandraMapping.getColumn(fieldName);
> +
> +    this.mutator.insert(key, columnFamily, HFactory.createStringColumn(columnName, value.toString()));
> +  }
> +
> +  /**
> +   * TODO do no convert bytes to string to store a binary field
> +   * @param value
> +   * @return
> +   */
> +  private static String toString(ByteBuffer value) {
> +    ByteBuffer byteBuffer = (ByteBuffer) value;
> +    return ByteUtils.toString(byteBuffer.array(), 0, byteBuffer.limit());
> +  }
> +
> +  /**
> +   * Insert a member in a super column. This is used for map and record Avro types.
> +   * @param key the row key
> +   * @param fieldName the field name
> +   * @param memberName the member name
> +   * @param value the member value
> +   */
> +  public void addSubColumn(String key, String fieldName, String memberName, Object value) {
> +    if (value == null) {
> +      return;
> +    }
> +
> +    if (value instanceof ByteBuffer) {
> +      value = toString((ByteBuffer) value);
> +    }
> +
> +    String columnFamily = this.cassandraMapping.getFamily(fieldName);
> +    String superColumnName = this.cassandraMapping.getColumn(fieldName);
> +
> +    this.mutator.insert(key, columnFamily, HFactory.createSuperColumn(superColumnName, Arrays.asList(HFactory.createStringColumn(memberName, value.toString())), this.stringSerializer, this.stringSerializer, this.stringSerializer));
> +
> +  }
> +
> +  /**
> +   * Select a family column in the keyspace.
> +   * @param cassandraQuery a wrapper of the query
> +   * @param family the family name to be queried
> +   * @return a list of family rows
> +   */
> +  public List<Row<String, String, String>> execute(CassandraQuery<K, T> cassandraQuery, String family) {
> +
> +    String[] columnNames = cassandraQuery.getColumns(family);
> +    Query<K, T> query = cassandraQuery.getQuery();
> +    int limit = (int) query.getLimit();
> +    String startKey = (String) query.getStartKey();
> +    String endKey = (String) query.getEndKey();
> +
> +    if (startKey == null) {
> +      startKey = "";
> +    }
> +    if (endKey == null) {
> +      endKey = "";
> +    }
> +
> +
> +    RangeSlicesQuery<String, String, String> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.stringSerializer, stringSerializer, stringSerializer);
> +    rangeSlicesQuery.setColumnFamily(family);
> +    rangeSlicesQuery.setKeys(startKey, endKey);
> +    rangeSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
> +    rangeSlicesQuery.setRowCount(limit);
> +    rangeSlicesQuery.setColumnNames(columnNames);
> +
> +
> +    QueryResult<OrderedRows<String, String, String>> queryResult = rangeSlicesQuery.execute();
> +    OrderedRows<String, String, String> orderedRows = queryResult.get();
> +
> +
> +    return orderedRows.getList();
> +  }
> +
> +  /**
> +   * Select the families that contain at least one column mapped to a query field.
> +   * @param query indicates the columns to select
> +   * @return a map which keys are the family names and values the corresponding column names required to get all the query fields.
> +   */
> +  public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
> +    Map<String, List<String>> map = new HashMap<String, List<String>>();
> +    for (String field: query.getFields()) {
> +      String family = this.cassandraMapping.getFamily(field);
> +      String column = this.cassandraMapping.getColumn(field);
> +
> +      // check if the family value was already initialized
> +      List<String> list = map.get(family);
> +      if (list == null) {
> +        list = new ArrayList<String>();
> +        map.put(family, list);
> +      }
> +
> +      if (column != null) {
> +        list.add(column);
> +      }
> +
> +    }
> +
> +    return map;
> +  }
> +
> +  /**
> +   * Select the field names according to the column names, which format if fully qualified: "family:column"
> +   * @param query
> +   * @return a map which keys are the fully qualified column names and values the query fields
> +   */
> +  public Map<String, String> getReverseMap(Query<K, T> query) {
> +    Map<String, String> map = new HashMap<String, String>();
> +    for (String field: query.getFields()) {
> +      String family = this.cassandraMapping.getFamily(field);
> +      String column = this.cassandraMapping.getColumn(field);
> +
> +      map.put(family + ":" + column, field);
> +    }
> +
> +    return map;
> +
> +  }
> +
> +  public boolean isSuper(String family) {
> +    return this.cassandraMapping.isSuper(family);
> +  }
> +
> +  public List<SuperRow<String, String, String, String>> executeSuper(CassandraQuery<K, T> cassandraQuery, String family) {
> +    String[] columnNames = cassandraQuery.getColumns(family);
> +    Query<K, T> query = cassandraQuery.getQuery();
> +    int limit = (int) query.getLimit();
> +    String startKey = (String) query.getStartKey();
> +    String endKey = (String) query.getEndKey();
> +
> +    if (startKey == null) {
> +      startKey = "";
> +    }
> +    if (endKey == null) {
> +      endKey = "";
> +    }
> +
> +
> +    RangeSuperSlicesQuery<String, String, String, String> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.stringSerializer, this.stringSerializer, this.stringSerializer, this.stringSerializer);
> +    rangeSuperSlicesQuery.setColumnFamily(family);
> +    rangeSuperSlicesQuery.setKeys(startKey, endKey);
> +    rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
> +    rangeSuperSlicesQuery.setRowCount(limit);
> +    rangeSuperSlicesQuery.setColumnNames(columnNames);
> +
> +
> +    QueryResult<OrderedSuperRows<String, String, String, String>> queryResult = rangeSuperSlicesQuery.execute();
> +    OrderedSuperRows<String, String, String, String> orderedRows = queryResult.get();
> +    return orderedRows.getList();
> +
> +
> +  }
> +}
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java Fri Jul 22 00:33:59 2011
> @@ -1,50 +1,155 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *     http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
>  package org.apache.gora.cassandra.store;
>
> +import java.io.IOException;
> +import java.util.ArrayList;
>  import java.util.HashMap;
> +import java.util.List;
>  import java.util.Map;
> -import java.util.Set;
> +
> +import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition;
> +import me.prettyprint.cassandra.service.ThriftCfDef;
> +import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
> +import me.prettyprint.hector.api.ddl.ColumnType;
> +import me.prettyprint.hector.api.ddl.ComparatorType;
> +
> +import org.jdom.Document;
> +import org.jdom.Element;
> +import org.jdom.JDOMException;
> +import org.jdom.input.SAXBuilder;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
>  public class CassandraMapping {
> +
> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraMapping.class);
> +
> +  private static final String MAPPING_FILE = "gora-cassandra-mapping.xml";
> +  private static final String KEYSPACE_ELEMENT = "keyspace";
> +  private static final String NAME_ATTRIBUTE = "name";
> +  private static final String MAPPING_ELEMENT = "class";
> +  private static final String COLUMN_ATTRIBUTE = "qualifier";
> +  private static final String FAMILY_ATTRIBUTE = "family";
> +  private static final String SUPER_ATTRIBUTE = "type";
> +  private static final String CLUSTER_ATTRIBUTE = "cluster";
> +  private static final String HOST_ATTRIBUTE = "host";
> +
> +
> +  private String hostName;
> +  private String clusterName;
> +  private String keyspaceName;
> +
> +
> +  /**
> +   * List of the super column families.
> +   */
> +  private List<String> superFamilies = new ArrayList<String>();
> +
> +  /**
> +   * Look up the column family associated to the Avro field.
> +   */
> +  private Map<String, String> familyMap = new HashMap<String, String>();
> +
> +  /**
> +   * Look up the column associated to the Avro field.
> +   */
> +  private Map<String, String> columnMap = new HashMap<String, String>();
> +
> +  /**
> +   * Look up the column family from its name.
> +   */
> +  private Map<String, BasicColumnFamilyDefinition> columnFamilyDefinitions = new HashMap<String, BasicColumnFamilyDefinition>();
>
> -  private String keySpace;
> +  public String getHostName() {
> +    return this.hostName;
> +  }
>
> -  private Map<String, Boolean> families =
> -    new HashMap<String, Boolean>();
> +  public String getClusterName() {
> +    return this.clusterName;
> +  }
>
> -  public String getKeySpace() {
> -    return keySpace;
> +  public String getKeyspaceName() {
> +    return this.keyspaceName;
>   }
>
> -  public void setKeySpace(String keySpace) {
> -    this.keySpace = keySpace;
> +
> +  @SuppressWarnings("unchecked")
> +  public void loadConfiguration() throws JDOMException, IOException {
> +    SAXBuilder saxBuilder = new SAXBuilder();
> +    Document document = saxBuilder.build(getClass().getClassLoader().getResourceAsStream(MAPPING_FILE));
> +    Element root = document.getRootElement();
> +
> +    Element keyspace = root.getChild(KEYSPACE_ELEMENT);
> +    this.keyspaceName = keyspace.getAttributeValue(NAME_ATTRIBUTE);
> +    this.clusterName = keyspace.getAttributeValue(CLUSTER_ATTRIBUTE);
> +    this.hostName = keyspace.getAttributeValue(HOST_ATTRIBUTE);
> +
> +    // load column family definitions
> +    List<Element> elements = keyspace.getChildren();
> +    for (Element element: elements) {
> +      BasicColumnFamilyDefinition cfDef = new BasicColumnFamilyDefinition();
> +
> +      String familyName = element.getAttributeValue(NAME_ATTRIBUTE);
> +
> +      String superAttribute = element.getAttributeValue(SUPER_ATTRIBUTE);
> +      if (superAttribute != null) {
> +        this.superFamilies.add(familyName);
> +        cfDef.setColumnType(ColumnType.SUPER);
> +        cfDef.setSubComparatorType(ComparatorType.UTF8TYPE);
> +      }
> +
> +      cfDef.setKeyspaceName(this.keyspaceName);
> +      cfDef.setName(familyName);
> +      cfDef.setComparatorType(ComparatorType.UTF8TYPE);
> +      cfDef.setDefaultValidationClass(ComparatorType.UTF8TYPE.getClassName());
> +
> +      this.columnFamilyDefinitions.put(familyName, cfDef);
> +
> +    }
> +
> +    // load column definitions
> +    Element mapping = root.getChild(MAPPING_ELEMENT);
> +    elements = mapping.getChildren();
> +    for (Element element: elements) {
> +      String fieldName = element.getAttributeValue(NAME_ATTRIBUTE);
> +      String familyName = element.getAttributeValue(FAMILY_ATTRIBUTE);
> +      String columnName = element.getAttributeValue(COLUMN_ATTRIBUTE);
> +      BasicColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(familyName);
> +      if (columnFamilyDefinition == null) {
> +        LOG.warn("Family " + familyName + " was not declared in the keyspace.");
> +      }
> +
> +      this.familyMap.put(fieldName, familyName);
> +      this.columnMap.put(fieldName, columnName);
> +
> +    }
>   }
>
> -  public Set<String> getColumnFamilies() {
> -    return families.keySet();
> +  public String getFamily(String name) {
> +    return this.familyMap.get(name);
>   }
>
> -  public void addColumnFamily(String columnFamily, boolean isSuper) {
> -    families.put(columnFamily, isSuper);
> +  public String getColumn(String name) {
> +    return this.columnMap.get(name);
>   }
>
> -  public boolean isColumnFamilySuper(String columnFamily) {
> -    return families.get(columnFamily);
> +  /**
> +   * Read family super attribute.
> +   * @param family the family name
> +   * @return true is the family is a super column family
> +   */
> +  public boolean isSuper(String family) {
> +    return this.superFamilies.indexOf(family) != -1;
>   }
> +
> +  public List<ColumnFamilyDefinition> getColumnFamilyDefinitions() {
> +    List<ColumnFamilyDefinition> list = new ArrayList<ColumnFamilyDefinition>();
> +    for (String key: this.columnFamilyDefinitions.keySet()) {
> +      ColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(key);
> +      ThriftCfDef thriftCfDef = new ThriftCfDef(columnFamilyDefinition);
> +      list.add(thriftCfDef);
> +    }
> +
> +    return list;
> +  }
> +
>  }
>
> Modified: incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java (original)
> +++ incubator/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java Fri Jul 22 00:33:59 2011
> @@ -1,465 +1,334 @@
> -/**
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *     http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
>  package org.apache.gora.cassandra.store;
>
>  import java.io.IOException;
> -import java.net.InetAddress;
>  import java.util.ArrayList;
>  import java.util.HashMap;
>  import java.util.List;
>  import java.util.Map;
> -import java.util.Map.Entry;
> -import java.util.Properties;
> -import java.util.Set;
> +
> +import me.prettyprint.hector.api.beans.ColumnSlice;
> +import me.prettyprint.hector.api.beans.HColumn;
> +import me.prettyprint.hector.api.beans.HSuperColumn;
> +import me.prettyprint.hector.api.beans.Row;
> +import me.prettyprint.hector.api.beans.SuperRow;
> +import me.prettyprint.hector.api.beans.SuperSlice;
>
>  import org.apache.avro.Schema;
>  import org.apache.avro.Schema.Field;
>  import org.apache.avro.Schema.Type;
>  import org.apache.avro.generic.GenericArray;
>  import org.apache.avro.util.Utf8;
> -import org.apache.cassandra.thrift.TokenRange;
> -import org.apache.gora.cassandra.client.CassandraClient;
> -import org.apache.gora.cassandra.client.Mutate;
> -import org.apache.gora.cassandra.client.Row;
> -import org.apache.gora.cassandra.client.Select;
> -import org.apache.gora.cassandra.client.SimpleCassandraClient;
> -import org.apache.gora.cassandra.query.CassandraPartitionQuery;
>  import org.apache.gora.cassandra.query.CassandraQuery;
>  import org.apache.gora.cassandra.query.CassandraResult;
> -import org.apache.gora.persistency.ListGenericArray;
> +import org.apache.gora.cassandra.query.CassandraResultSet;
> +import org.apache.gora.cassandra.query.CassandraRow;
> +import org.apache.gora.cassandra.query.CassandraSubColumn;
> +import org.apache.gora.cassandra.query.CassandraSuperColumn;
>  import org.apache.gora.persistency.Persistent;
> -import org.apache.gora.persistency.State;
> -import org.apache.gora.persistency.StateManager;
>  import org.apache.gora.persistency.StatefulHashMap;
> -import org.apache.gora.persistency.StatefulMap;
> +import org.apache.gora.persistency.impl.PersistentBase;
> +import org.apache.gora.persistency.impl.StateManagerImpl;
>  import org.apache.gora.query.PartitionQuery;
>  import org.apache.gora.query.Query;
>  import org.apache.gora.query.Result;
> -import org.apache.gora.store.DataStoreFactory;
> +import org.apache.gora.query.impl.PartitionQueryImpl;
>  import org.apache.gora.store.impl.DataStoreBase;
> -import org.apache.gora.util.ByteUtils;
> -import org.jdom.Document;
> -import org.jdom.Element;
> -import org.jdom.input.SAXBuilder;
> -
> -/**
> - * DataStore for Cassandra.
> - *
> - * <p> Note: CassandraStore is not thread-safe. </p>
> - */
> -public class CassandraStore<K, T extends Persistent>
> -extends DataStoreBase<K, T> {
> -
> -  private static final String ERROR_MESSAGE =
> -    "Cassandra does not support creating or modifying ColumnFamilies during runtime";
> -
> -  private static final String DEFAULT_MAPPING_FILE = "gora-cassandra-mapping.xml";
> -
> -  private static final int SPLIT_SIZE = 65536;
> -
> -  private static final int BATCH_COUNT = 256;
> -
> -  private CassandraClient client;
> -
> -  private Map<String, CassandraColumn> columnMap;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
>
> -  private CassandraMapping mapping;
> -
> -  @Override
> -  public void initialize(Class<K> keyClass, Class<T> persistentClass,
> -      Properties properties) throws IOException {
> -    super.initialize(keyClass, persistentClass, properties);
> -
> -    String mappingFile =
> -      DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
> -
> -    readMapping(mappingFile);
> +public class CassandraStore<K, T extends Persistent> extends DataStoreBase<K, T> {
> +  public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class);
> +
> +  private CassandraClient<K, T>  cassandraClient = new CassandraClient<K, T>();
> +
> +  /**
> +   * The values are Avro fields pending to be stored.
> +   */
> +  private Map<K, T> buffer = new HashMap<K, T>();
> +
> +  public CassandraStore() throws Exception {
> +    this.cassandraClient.init();
>   }
>
>   @Override
> -  public String getSchemaName() {
> -    return mapping.getKeySpace();
> +  public void close() throws IOException {
> +    LOG.debug("close");
> +    flush();
>   }
>
>   @Override
> -  public void createSchema() throws IOException {
> -    throw new UnsupportedOperationException(ERROR_MESSAGE);
> +  public void createSchema() {
> +    LOG.debug("create schema");
> +    this.cassandraClient.checkKeyspace();
>   }
>
>   @Override
> -  public void deleteSchema() throws IOException {
> -    throw new UnsupportedOperationException(ERROR_MESSAGE);
> +  public boolean delete(K key) throws IOException {
> +    LOG.debug("delete " + key);
> +    return false;
>   }
>
>   @Override
> -  public boolean schemaExists() throws IOException {
> -    return true;
> +  public long deleteByQuery(Query<K, T> query) throws IOException {
> +    LOG.debug("delete by query " + query);
> +    return 0;
>   }
>
> -  public CassandraClient getClientByLocation(String endPoint) {
> -    return client;
> +  @Override
> +  public void deleteSchema() throws IOException {
> +    LOG.debug("delete schema");
> +    this.cassandraClient.dropKeyspace();
>   }
>
> -  public Select createSelect(String[] fields) {
> -    Select select = new Select();
> -    if (fields == null) {
> -      fields = beanFactory.getCachedPersistent().getFields();
> -    }
> -    for (String f : fields) {
> -      CassandraColumn col = columnMap.get(f);
> -      Schema fieldSchema = fieldMap.get(f).schema();
> -      switch (fieldSchema.getType()) {
> -        case MAP:
> -        case ARRAY:
> -          if (col.isSuperColumn()) {
> -            select.addAllColumnsForSuperColumn(col.family, col.superColumn);
> -          } else {
> -            select.addColumnAll(col.family);
> -          }
> -          break;
> -        default:
> -          if (col.isSuperColumn()) {
> -            select.addColumnName(col.family, col.superColumn, col.column);
> -          } else {
> -            select.addColumnName(col.family, col.column);
> -          }
> -          break;
> +  @Override
> +  public Result<K, T> execute(Query<K, T> query) throws IOException {
> +
> +    Map<String, List<String>> familyMap = this.cassandraClient.getFamilyMap(query);
> +    Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query);
> +
> +    CassandraQuery<K, T> cassandraQuery = new CassandraQuery<K, T>();
> +    cassandraQuery.setQuery(query);
> +    cassandraQuery.setFamilyMap(familyMap);
> +
> +    CassandraResult<K, T> cassandraResult = new CassandraResult<K, T>(this, query);
> +    cassandraResult.setReverseMap(reverseMap);
> +
> +    CassandraResultSet cassandraResultSet = new CassandraResultSet();
> +
> +    // We query Cassandra keyspace by families.
> +    for (String family : familyMap.keySet()) {
> +      if (this.cassandraClient.isSuper(family)) {
> +        addSuperColumns(family, cassandraQuery, cassandraResultSet);
> +
> +      } else {
> +        addSubColumns(family, cassandraQuery, cassandraResultSet);
> +
>       }
> +
>     }
> -    return select;
> -  }
> +
> +    cassandraResult.setResultSet(cassandraResultSet);
> +
> +
> +    return cassandraResult;
>
> -  @Override
> -  public T get(K key, String[] fields) throws IOException {
> -    if (fields == null) {
> -      fields = beanFactory.getCachedPersistent().getFields();
> -    }
> -    Select select = createSelect(fields);
> -    try {
> -      Row result = client.get(key.toString(), select);
> -      return newInstance(result, fields);
> -    } catch (Exception e) {
> -      throw new IOException(e);
> -    }
>   }
>
> -  @SuppressWarnings("rawtypes")
> -  private void setField(T persistent, Field field, StatefulMap map) {
> -    persistent.put(field.pos(), map);
> -  }
> -
> -  private void setField(T persistent, Field field, byte[] val)
> -  throws IOException {
> -    persistent.put(field.pos()
> -        , ByteUtils.fromBytes(val, field.schema(), datumReader, persistent.get(field.pos())));
> -  }
> -
> -  @SuppressWarnings("rawtypes")
> -  private void setField(T persistent, Field field, GenericArray list) {
> -    persistent.put(field.pos(), list);
> -  }
> -
> -  @SuppressWarnings({ "rawtypes", "unchecked" })
> -  public T newInstance(Row result, String[] fields)
> -  throws IOException {
> -    if(result == null)
> -      return null;
> -
> -    T persistent = newPersistent();
> -    StateManager stateManager = persistent.getStateManager();
> -    for (String f : fields) {
> -      CassandraColumn col = columnMap.get(f);
> -      Field field = fieldMap.get(f);
> -      Schema fieldSchema = field.schema();
> -      Map<String, byte[]> qualMap;
> -      switch(fieldSchema.getType()) {
> -        case MAP:
> -          if (col.isSuperColumn()) {
> -            qualMap = result.getSuperColumn(col.family, col.superColumn);
> -          } else {
> -            qualMap = result.getColumn(col.family);
> -          }
> -          if (qualMap == null) {
> -            continue;
> -          }
> -          Schema valueSchema = fieldSchema.getValueType();
> -          StatefulMap map = new StatefulHashMap();
> -          for (Entry<String, byte[]> e : qualMap.entrySet()) {
> -            Utf8 mapKey = new Utf8(e.getKey());
> -            map.put(mapKey, ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
> -            map.putState(mapKey, State.CLEAN);
> -          }
> -          setField(persistent, field, map);
> -          break;
> -        case ARRAY:
> -          if (col.isSuperColumn()) {
> -            qualMap = result.getSuperColumn(col.family, col.superColumn);
> -          } else {
> -            qualMap = result.getColumn(col.family);
> -          }
> -          if (qualMap == null) {
> -            continue;
> -          }
> -          valueSchema = fieldSchema.getElementType();
> -          ArrayList arrayList = new ArrayList();
> -          for (Entry<String, byte[]> e : qualMap.entrySet()) {
> -            arrayList.add(ByteUtils.fromBytes(e.getValue(), valueSchema, datumReader, null));
> -          }
> -          ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList);
> -          setField(persistent, field, arr);
> -          break;
> -        default:
> -          byte[] val;
> -          if (col.isSuperColumn()) {
> -            val = result.get(col.family, col.superColumn, col.column);
> -          } else {
> -            val = result.get(col.family, col.column);
> -          }
> -          if (val == null) {
> -            continue;
> -          }
> -          setField(persistent, field, val);
> -          break;
> +  private void addSubColumns(String family, CassandraQuery<K, T> cassandraQuery,
> +      CassandraResultSet cassandraResultSet) {
> +    // select family columns that are included in the query
> +    List<Row<String, String, String>> rows = this.cassandraClient.execute(cassandraQuery, family);
> +
> +    for (Row<String, String, String> row : rows) {
> +      String key = row.getKey();
> +
> +      // find associated row in the resultset
> +      CassandraRow cassandraRow = cassandraResultSet.getRow(key);
> +      if (cassandraRow == null) {
> +        cassandraRow = new CassandraRow();
> +        cassandraResultSet.putRow(key, cassandraRow);
> +        cassandraRow.setKey(key);
>       }
> +
> +      ColumnSlice<String, String> columnSlice = row.getColumnSlice();
> +
> +      for (HColumn<String, String> hColumn : columnSlice.getColumns()) {
> +        CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
> +        cassandraSubColumn.setValue(hColumn);
> +        cassandraSubColumn.setFamily(family);
> +        cassandraRow.add(cassandraSubColumn);
> +      }
> +
>     }
> -    stateManager.clearDirty(persistent);
> -    return persistent;
>   }
>
> -  @Override
> -  public void put(K key, T obj) throws IOException {
> -    Mutate mutate = new Mutate();
> -    Schema schema = obj.getSchema();
> -    StateManager stateManager = obj.getStateManager();
> -    List<Field> fields = schema.getFields();
> -    String qual;
> -    byte[] value;
> -    for (int i = 0; i < fields.size(); i++) {
> -      if (!stateManager.isDirty(obj, i)) {
> -        continue;
> +  private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery,
> +      CassandraResultSet cassandraResultSet) {
> +
> +    List<SuperRow<String, String, String, String>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family);
> +    for (SuperRow<String, String, String, String> superRow: superRows) {
> +      String key = superRow.getKey();
> +      CassandraRow cassandraRow = cassandraResultSet.getRow(key);
> +      if (cassandraRow == null) {
> +        cassandraRow = new CassandraRow();
> +        cassandraResultSet.putRow(key, cassandraRow);
> +        cassandraRow.setKey(key);
>       }
> -      Field field = fields.get(i);
> -      Type type = field.schema().getType();
> -      Object o = obj.get(i);
> -      CassandraColumn col = columnMap.get(field.name());
> -
> -      switch(type) {
> -      case MAP:
> -        if(o instanceof StatefulMap) {
> -          @SuppressWarnings("unchecked")
> -          StatefulMap<Utf8, ?> map = (StatefulMap<Utf8, ?>) o;
> -          for (Entry<Utf8, State> e : map.states().entrySet()) {
> -            Utf8 mapKey = e.getKey();
> -            switch (e.getValue()) {
> -            case DIRTY:
> -              qual = mapKey.toString();
> -              value = ByteUtils.toBytes(map.get(mapKey), field.schema().getValueType(), datumWriter);
> -              if (col.isSuperColumn()) {
> -                mutate.put(col.family, col.superColumn, qual, value);
> -              } else {
> -                mutate.put(col.family, qual, value);
> -              }
> -              break;
> -            case DELETED:
> -              qual = mapKey.toString();
> -              if (col.isSuperColumn()) {
> -                mutate.delete(col.family, col.superColumn, qual);
> -              } else {
> -                mutate.delete(col.family, qual);
> -              }
> -              break;
> -            }
> -          }
> -        } else {
> -          @SuppressWarnings({ "rawtypes", "unchecked" })
> -          Set<Map.Entry> set = ((Map)o).entrySet();
> -          for(@SuppressWarnings("rawtypes") Entry entry: set) {
> -            qual = entry.getKey().toString();
> -            value = ByteUtils.toBytes(entry.getValue().toString());
> -            if (col.isSuperColumn()) {
> -              mutate.put(col.family, col.superColumn, qual, value);
> -            } else {
> -              mutate.put(col.family, qual, value);
> -            }
> -          }
> -        }
> -        break;
> -      case ARRAY:
> -        if(o instanceof GenericArray) {
> -          @SuppressWarnings("rawtypes")
> -          GenericArray arr = (GenericArray) o;
> -          int j=0;
> -          for(Object item : arr) {
> -            value = ByteUtils.toBytes(item.toString());
> -            if (col.isSuperColumn()) {
> -              mutate.put(col.family, col.superColumn, Integer.toString(j), value);
> -            } else {
> -              mutate.put(col.family, Integer.toString(j), value);
> -            }
> -            j++;
> -          }
> -        }
> -        break;
> -      default:
> -        value = ByteUtils.toBytes(o, field.schema(), datumWriter);
> -        if (col.isSuperColumn()) {
> -          mutate.put(col.family, col.superColumn, col.column, value);
> -        } else {
> -          mutate.put(col.family, col.column, value);
> -        }
> -        break;
> +
> +      SuperSlice<String, String, String> superSlice = superRow.getSuperSlice();
> +      for (HSuperColumn<String, String, String> hSuperColumn: superSlice.getSuperColumns()) {
> +        CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
> +        cassandraSuperColumn.setValue(hSuperColumn);
> +        cassandraSuperColumn.setFamily(family);
> +        cassandraRow.add(cassandraSuperColumn);
>       }
>     }
> -
> -    if(!mutate.isEmpty())
> -      client.mutate(key.toString(), mutate);
>   }
>
> -  @Override
> -  public boolean delete(K key) throws IOException {
> -    Mutate mutate = new Mutate();
> -    for (String family : mapping.getColumnFamilies()) {
> -      mutate.deleteAll(family);
> +  /**
> +   * Flush the buffer. Write the buffered rows.
> +   * @see org.apache.gora.store.DataStore#flush()
> +   */
> +  @Override
> +  public void flush() throws IOException {
> +    for (K key: this.buffer.keySet()) {
> +      T value = this.buffer.get(key);
> +      Schema schema = value.getSchema();
> +      for (Field field: schema.getFields()) {
> +        if (value.isDirty(field.pos())) {
> +          addOrUpdateField((String) key, field, value.get(field.pos()));
> +        }
> +      }
>     }
> -
> -    client.mutate(key.toString(), mutate);
> -    return true;
> +
> +    this.buffer.clear();
>   }
>
>   @Override
> -  public void flush() throws IOException { }
> -
> -  @Override
> -  public void close() throws IOException {
> -    client.close();
> +  public T get(K key, String[] fields) throws IOException {
> +    LOG.info("get " + key);
> +    return null;
>   }
>
>   @Override
> -  public Query<K, T> newQuery() {
> -    return new CassandraQuery<K, T>(this);
> +  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
> +      throws IOException {
> +    // just a single partition
> +    List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
> +    partitions.add(new PartitionQueryImpl<K,T>(query));
> +    return partitions;
>   }
>
>   @Override
> -  public long deleteByQuery(Query<K, T> query) throws IOException {
> -    // TODO Auto-generated method stub
> -    return 0;
> +  public String getSchemaName() {
> +    LOG.info("get schema name");
> +    return null;
>   }
>
>   @Override
> -  public Result<K, T> execute(Query<K, T> query) throws IOException {
> -    return new CassandraResult<K, T>(this, query, BATCH_COUNT);
> +  public Query<K, T> newQuery() {
> +    return new CassandraQuery<K, T>(this);
>   }
>
> -  @Override
> -  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
> -  throws IOException {
> -    List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
> -
> -    List<TokenRange> rangeList = client.describeRing();
> -    for (TokenRange range : rangeList) {
> -      List<String> tokens =
> -        client.describeSplits(range.start_token, range.end_token, SPLIT_SIZE);
> -      // turn the sub-ranges into InputSplits
> -      String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
> -      // hadoop needs hostname, not ip
> -      for (int i = 0; i < endpoints.length; i++) {
> -          endpoints[i] = InetAddress.getByName(endpoints[i]).getHostName();
> -      }
> -
> -      for (int i = 1; i < tokens.size(); i++) {
> -        CassandraPartitionQuery<K, T> partitionQuery =
> -          new CassandraPartitionQuery<K, T>(query, tokens.get(i - 1), tokens.get(i), endpoints, SPLIT_SIZE);
> -        partitions.add(partitionQuery);
> +  /**
> +   * Duplicate instance to keep all the objects in memory till flushing.
> +   * @see org.apache.gora.store.DataStore#put(java.lang.Object, org.apache.gora.persistency.Persistent)
> +   */
> +  @Override
> +  public void put(K key, T value) throws IOException {
> +    T p = (T) value.newInstance(new StateManagerImpl());
> +    Schema schema = value.getSchema();
> +    for (Field field: schema.getFields()) {
> +      if (value.isDirty(field.pos())) {
> +        Object fieldValue = value.get(field.pos());
> +
> +        // check if field has a nested structure (map or record)
> +        Schema fieldSchema = field.schema();
> +        Type type = fieldSchema.getType();
> +        switch(type) {
> +          case RECORD:
> +            Persistent persistent = (Persistent) fieldValue;
> +            Persistent newRecord = persistent.newInstance(new StateManagerImpl());
> +            for (Field member: fieldSchema.getFields()) {
> +              newRecord.put(member.pos(), persistent.get(member.pos()));
> +            }
> +            fieldValue = newRecord;
> +            break;
> +          case MAP:
> +            StatefulHashMap<?, ?> map = (StatefulHashMap<?, ?>) fieldValue;
> +            StatefulHashMap<?, ?> newMap = new StatefulHashMap(map);
> +            fieldValue = newMap;
> +            break;
> +        }
> +
> +        p.put(field.pos(), fieldValue);
>       }
>     }
> -    return partitions;
> -  }
> -
> -  private CassandraClient createClient() throws IOException {
> -    String serverStr =
> -      DataStoreFactory.findPropertyOrDie(properties, this, "servers");
> -    String[] server1Parts = serverStr.split(",")[0].split(":");
> -    try {
> -      return new SimpleCassandraClient(server1Parts[0],
> -          Integer.parseInt(server1Parts[1]), mapping.getKeySpace());
> -    } catch (Exception e) {
> -      throw new IOException(e);
> -    }
> -  }
> -
> -  @SuppressWarnings("unchecked")
> -  protected void readMapping(String filename) throws IOException {
> -
> -    mapping = new CassandraMapping();
> -    columnMap = new HashMap<String, CassandraColumn>();
> -
> -    try {
> -      SAXBuilder builder = new SAXBuilder();
> -      Document doc = builder.build(getClass().getClassLoader()
> -          .getResourceAsStream(filename));
> -
> -      List<Element> classes = doc.getRootElement().getChildren("class");
> -
> -      for(Element classElement: classes) {
> -        if(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName())
> -            && classElement.getAttributeValue("name").equals(
> -                persistentClass.getCanonicalName())) {
> -
> -          String keySpace = classElement.getAttributeValue("keyspace");
> -          mapping.setKeySpace(keySpace);
> -          client = createClient();
> -          Map<String, Map<String, String>> keySpaceDesc = client.describeKeySpace();
> -          for (Entry<String, Map<String, String>> e : keySpaceDesc.entrySet()) {
> -            boolean isSuper = e.getValue().get("Type").equals("Super");
> -            mapping.addColumnFamily(e.getKey(), isSuper);
> -          }
> -
> -          List<Element> fields = classElement.getChildren("field");
> -
> -          for(Element field:fields) {
> -            String fieldName = field.getAttributeValue("name");
> -            String path = field.getAttributeValue("path");
> -            String[] parts = path.split(":");
> -            String columnFamily = parts[0];
> -            String superColumn = null;
> -            String column = null;
> -
> -            boolean isSuper = mapping.isColumnFamilySuper(columnFamily);
> -            if (isSuper) {
> -              superColumn = parts[1];
> -              if (parts.length == 3) {
> -                column = parts[2];
> +
> +    this.buffer.put(key, p);
> + }
> +
> +  /**
> +   * Add a field to Cassandra according to its type.
> +   * @param key     the key of the row where the field should be added
> +   * @param field   the Avro field representing a datum
> +   * @param value   the field value
> +   */
> +  private void addOrUpdateField(String key, Field field, Object value) {
> +    Schema schema = field.schema();
> +    Type type = schema.getType();
> +    //LOG.info(field.name() + " " + type.name());
> +    switch (type) {
> +      case STRING:
> +        this.cassandraClient.addColumn(key, field.name(), value);
> +        break;
> +      case INT:
> +        this.cassandraClient.addColumn(key, field.name(), value);
> +        break;
> +      case LONG:
> +        this.cassandraClient.addColumn(key, field.name(), value);
> +        break;
> +      case BYTES:
> +        this.cassandraClient.addColumn(key, field.name(), value);
> +        break;
> +      case FLOAT:
> +        this.cassandraClient.addColumn(key, field.name(), value);
> +        break;
> +      case RECORD:
> +        if (value != null) {
> +          if (value instanceof PersistentBase) {
> +            PersistentBase persistentBase = (PersistentBase) value;
> +            for (Field member: schema.getFields()) {
> +
> +              // TODO: hack, do not store empty arrays
> +              Object memberValue = persistentBase.get(member.pos());
> +              if (memberValue instanceof GenericArray<?>) {
> +                GenericArray<String> array = (GenericArray<String>) memberValue;
> +                if (array.size() == 0) {
> +                  continue;
> +                }
>               }
> -            } else {
> -              if (parts.length == 2) {
> -                column = parts[1];
> +
> +              this.cassandraClient.addSubColumn(key, field.name(), member.name(), memberValue);
> +            }
> +          } else {
> +            LOG.info("Record not supported: " + value.toString());
> +
> +          }
> +        }
> +        break;
> +      case MAP:
> +        if (value != null) {
> +          if (value instanceof StatefulHashMap<?, ?>) {
> +            //TODO cast to stateful map and only write dirty keys
> +            Map<Utf8, Object> map = (Map<Utf8, Object>) value;
> +            for (Utf8 mapKey: map.keySet()) {
> +
> +              // TODO: hack, do not store empty arrays
> +              Object keyValue = map.get(mapKey);
> +              if (keyValue instanceof GenericArray<?>) {
> +                GenericArray<String> array = (GenericArray<String>) keyValue;
> +                if (array.size() == 0) {
> +                  continue;
> +                }
>               }
> +
> +              this.cassandraClient.addSubColumn(key, field.name(), mapKey.toString(), keyValue);
>             }
> -
> -            columnMap.put(fieldName,
> -                new CassandraColumn(columnFamily, superColumn, column));
> +          } else {
> +            LOG.info("Map not supported: " + value.toString());
>           }
> -
> -          break;
>         }
> -      }
> -    } catch(Exception ex) {
> -      throw new IOException(ex);
> +        break;
> +      default:
> +        LOG.info("Type not considered: " + type.name());
>     }
>   }
> -}
> \ No newline at end of file
> +
> +  @Override
> +  public boolean schemaExists() throws IOException {
> +    LOG.info("schema exists");
> +    return false;
> +  }
> +
> +}
>
> Modified: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
> URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java?rev=1149420&r1=1149419&r2=1149420&view=diff
> ==============================================================================
> --- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java (original)
> +++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java Fri Jul 22 00:33:59 2011
> @@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
>  public class GoraRecordReader<K, T extends Persistent> extends RecordReader<K,T> {
>   public static final Logger LOG = LoggerFactory.getLogger(GoraRecordReader.class);
>
> -  private static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
> -  private static final int BUFFER_LIMIT_READ_VALUE = 10000;
> +  public static final String BUFFER_LIMIT_READ_NAME = "gora.buffer.read.limit";
> +  public static final int BUFFER_LIMIT_READ_VALUE = 10000;
>
>   protected Query<K,T> query;
>   protected Result<K,T> result;
>
>
>

Mime
View raw message