incubator-gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject svn commit: r1021390 [3/9] - in /incubator/gora/trunk: ./ bin/ conf/ docs/ gora-cassandra/ gora-cassandra/ivy/ gora-cassandra/src/examples/java/ gora-cassandra/src/main/java/org/apache/ gora-cassandra/src/main/java/org/apache/gora/ gora-cassandra/src/m...
Date Mon, 11 Oct 2010 15:40:29 GMT
Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/AvroStore.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.avro.store;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.JsonDecoder;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.gora.avro.query.AvroQuery;
+import org.apache.gora.avro.query.AvroResult;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.FileSplitPartitionQuery;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.store.impl.FileBackedDataStoreBase;
+import org.apache.gora.util.OperationNotSupportedException;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An adapter DataStore for binary-compatible Avro serializations.
+ * AvroDataStore supports Binary and JSON serializations.
+ * @param <T>
+ */
+public class AvroStore<K, T extends Persistent>
+  extends FileBackedDataStoreBase<K, T> implements Configurable {
+
+  /** The property key specifying avro encoder/decoder type to use. Can take values
+   * "BINARY" or "JSON". */
+  public static final String CODEC_TYPE_KEY = "codec.type";
+
+  /**
+   * The type of the avro Encoder/Decoder.
+   */
+  public static enum CodecType {
+    /** Avro binary encoder */
+    BINARY,
+    /** Avro JSON encoder */
+    JSON,
+  }
+
+  private DatumReader<T> datumReader;
+  private DatumWriter<T> datumWriter;
+  private Encoder encoder;
+  private Decoder decoder;
+
+  private CodecType codecType = CodecType.JSON;
+
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass,
+      Properties properties) throws IOException {
+    super.initialize(keyClass, persistentClass, properties);
+
+    if(properties != null) {
+      if(this.codecType == null) {
+        String codecType = DataStoreFactory.findProperty(
+            properties, this, CODEC_TYPE_KEY, "BINARY");
+        this.codecType = CodecType.valueOf(codecType);
+      }
+    }
+  }
+
+  public void setCodecType(CodecType codecType) {
+    this.codecType = codecType;
+  }
+
+  public void setEncoder(Encoder encoder) {
+    this.encoder = encoder;
+  }
+
+  public void setDecoder(Decoder decoder) {
+    this.decoder = decoder;
+  }
+
+  public void setDatumReader(DatumReader<T> datumReader) {
+    this.datumReader = datumReader;
+  }
+
+  public void setDatumWriter(DatumWriter<T> datumWriter) {
+    this.datumWriter = datumWriter;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    if(encoder != null) {
+      encoder.flush();
+    }
+    encoder = null;
+    decoder = null;
+  }
+
+  @Override
+  public boolean delete(K key) throws IOException {
+    throw new OperationNotSupportedException("delete is not supported for AvroStore");
+  }
+
+  @Override
+  public long deleteByQuery(Query<K, T> query) throws IOException {
+    throw new OperationNotSupportedException("delete is not supported for AvroStore");
+  }
+
+  /**
+   * Executes a normal Query reading the whole data. #execute() calls this function
+   * for non-PartitionQuery's.
+   */
+  @Override
+  protected Result<K,T> executeQuery(Query<K,T> query) throws IOException {
+    return new AvroResult<K,T>(this, (AvroQuery<K,T>)query,
+        getDatumReader(), getDecoder());
+  }
+
+  /**
+   * Executes a PartitialQuery, reading the data between start and end.
+   */
+  @Override
+  protected Result<K,T> executePartial(FileSplitPartitionQuery<K,T> query)
+  throws IOException {
+    throw new OperationNotSupportedException("Not yet implemented");
+  }
+
+  @Override
+  public void flush() throws IOException {
+    super.flush();
+    if(encoder != null)
+      encoder.flush();
+  }
+
+  @Override
+  public T get(K key, String[] fields) throws IOException {
+    throw new OperationNotSupportedException();
+  }
+
+  @Override
+  public AvroQuery<K,T> newQuery() {
+    return new AvroQuery<K,T>(this);
+  }
+
+  @Override
+  public void put(K key, T obj) throws IOException {
+    getDatumWriter().write(obj, getEncoder());
+  }
+
+  public Encoder getEncoder() throws IOException {
+    if(encoder == null) {
+      encoder = createEncoder();
+    }
+    return encoder;
+  }
+
+  public Decoder getDecoder() throws IOException {
+    if(decoder == null) {
+      decoder = createDecoder();
+    }
+    return decoder;
+  }
+
+  public DatumReader<T> getDatumReader() {
+    if(datumReader == null) {
+      datumReader = createDatumReader();
+    }
+    return datumReader;
+  }
+
+  public DatumWriter<T> getDatumWriter() {
+    if(datumWriter == null) {
+      datumWriter = createDatumWriter();
+    }
+    return datumWriter;
+  }
+
+  protected Encoder createEncoder() throws IOException {
+    switch(codecType) {
+      case BINARY:
+        return new BinaryEncoder(getOrCreateOutputStream());
+      case JSON:
+        return new JsonEncoder(schema, getOrCreateOutputStream());
+    }
+    return null;
+  }
+
+  @SuppressWarnings("deprecation")
+  protected Decoder createDecoder() throws IOException {
+    switch(codecType) {
+      case BINARY:
+        return new BinaryDecoder(getOrCreateInputStream());
+      case JSON:
+        return new JsonDecoder(schema, getOrCreateInputStream());
+    }
+    return null;
+  }
+
+  protected DatumWriter<T> createDatumWriter() {
+    return new SpecificDatumWriter<T>(schema);
+  }
+
+  protected DatumReader<T> createDatumReader() {
+    return new SpecificDatumReader<T>(schema);
+  }
+
+  @Override
+  public Configuration getConf() {
+    if(conf == null) {
+      conf = new Configuration();
+    }
+    return conf;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+  }
+
+  @Override
+  public String getSchemaName() {
+    return "default";
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/avro/store/DataFileAvroStore.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.avro.store;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.gora.avro.mapreduce.FsInput;
+import org.apache.gora.avro.query.DataFileAvroResult;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.FileSplitPartitionQuery;
+import org.apache.gora.util.OperationNotSupportedException;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * DataFileAvroStore is file based store which uses Avro's 
+ * DataFile{Writer,Reader}'s as a backend. This datastore supports 
+ * mapreduce.
+ */
+public class DataFileAvroStore<K, T extends Persistent> extends AvroStore<K, T> {
+
+  public DataFileAvroStore() {
+  }
+  
+  private DataFileWriter<T> writer;
+  
+  @Override
+  public T get(K key, String[] fields) throws java.io.IOException {
+    throw new OperationNotSupportedException(
+        "Avro DataFile's does not support indexed retrieval");
+  };
+  
+  @Override
+  public void put(K key, T obj) throws java.io.IOException {
+    getWriter().append(obj);
+  };
+  
+  private DataFileWriter<T> getWriter() throws IOException {
+    if(writer == null) {
+      writer = new DataFileWriter<T>(getDatumWriter());
+      writer.create(schema, getOrCreateOutputStream());
+    }
+    return writer;
+  }
+  
+  @Override
+  protected Result<K, T> executeQuery(Query<K, T> query) throws IOException {
+    return new DataFileAvroResult<K, T>(this, query
+        , createReader(createFsInput()));
+  }
+ 
+  @Override
+  protected Result<K,T> executePartial(FileSplitPartitionQuery<K,T> query) 
+    throws IOException {
+    FsInput fsInput = createFsInput();
+    DataFileReader<T> reader = createReader(fsInput);
+    return new DataFileAvroResult<K, T>(this, query, reader, fsInput
+        , query.getStart(), query.getLength());
+  }
+  
+  private DataFileReader<T> createReader(FsInput fsInput) throws IOException {
+    return new DataFileReader<T>(fsInput, getDatumReader());
+  }
+  
+  private FsInput createFsInput() throws IOException {
+    Path path = new Path(getInputPath());
+    return new FsInput(path, getConf());
+  }
+  
+  @Override
+  public void flush() throws IOException {
+    super.flush();
+    if(writer != null) {
+      writer.flush();
+    }
+  }
+  
+  @Override
+  public void close() throws IOException {
+    if(writer != null)  
+      writer.close(); //hadoop 0.20.2 HDFS streams do not allow 
+                      //to close twice, so close the writer first 
+    writer = null;
+    super.close();
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/compiler/GoraCompiler.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/compiler/GoraCompiler.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/compiler/GoraCompiler.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/compiler/GoraCompiler.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.compiler;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.specific.SpecificData;
+
+/** Generate specific Java interfaces and classes for protocols and schemas. */
+public class GoraCompiler {
+  private File dest;
+  private Writer out;
+  private Set<Schema> queue = new HashSet<Schema>();
+
+  private GoraCompiler(File dest) {
+    this.dest = dest;                             // root directory for output
+  }
+
+  /** Generates Java interface and classes for a protocol.
+   * @param src the source Avro protocol file
+   * @param dest the directory to place generated files in
+   */
+  public static void compileProtocol(File src, File dest) throws IOException {
+    GoraCompiler compiler = new GoraCompiler(dest);
+    Protocol protocol = Protocol.parse(src);
+    for (Schema s : protocol.getTypes())          // enqueue types
+      compiler.enqueue(s);
+    compiler.compileInterface(protocol);          // generate interface
+    compiler.compile();                           // generate classes for types
+  }
+
+  /** Generates Java classes for a schema. */
+  public static void compileSchema(File src, File dest) throws IOException {
+    GoraCompiler compiler = new GoraCompiler(dest);
+    compiler.enqueue(Schema.parse(src));          // enqueue types
+    compiler.compile();                           // generate classes for types
+  }
+
+  private static String camelCasify(String s) {
+    return s.substring(0, 1).toUpperCase() + s.substring(1);
+  }
+
+  /** Recognizes camel case */
+  private static String toUpperCase(String s) {
+    StringBuilder builder = new StringBuilder();
+
+    for(int i=0; i<s.length(); i++) {
+      if(i > 0) {
+        if(Character.isUpperCase(s.charAt(i))
+         && Character.isLowerCase(s.charAt(i-1))
+         && Character.isLetter(s.charAt(i))) {
+          builder.append("_");
+        }
+      }
+      builder.append(Character.toUpperCase(s.charAt(i)));
+    }
+
+    return builder.toString();
+  }
+
+  /** Recursively enqueue schemas that need a class generated. */
+  private void enqueue(Schema schema) throws IOException {
+    if (queue.contains(schema)) return;
+    switch (schema.getType()) {
+    case RECORD:
+      queue.add(schema);
+      for (Field field : schema.getFields())
+        enqueue(field.schema());
+      break;
+    case MAP:
+      enqueue(schema.getValueType());
+      break;
+    case ARRAY:
+      enqueue(schema.getElementType());
+      break;
+    case UNION:
+      for (Schema s : schema.getTypes())
+        enqueue(s);
+      break;
+    case ENUM:
+    case FIXED:
+      queue.add(schema);
+      break;
+    case STRING: case BYTES:
+    case INT: case LONG:
+    case FLOAT: case DOUBLE:
+    case BOOLEAN: case NULL:
+      break;
+    default: throw new RuntimeException("Unknown type: "+schema);
+    }
+  }
+
+  /** Generate java classes for enqueued schemas. */
+  private void compile() throws IOException {
+    for (Schema schema : queue)
+      compile(schema);
+  }
+
+  private void compileInterface(Protocol protocol) throws IOException {
+    startFile(protocol.getName(), protocol.getNamespace());
+    try {
+      line(0, "public interface "+protocol.getName()+" {");
+
+      out.append("\n");
+      for (Map.Entry<String,Message> e : protocol.getMessages().entrySet()) {
+        String name = e.getKey();
+        Message message = e.getValue();
+        Schema request = message.getRequest();
+        Schema response = message.getResponse();
+        line(1, unbox(response)+" "+name+"("+params(request)+")");
+        line(2,"throws AvroRemoteException"+errors(message.getErrors())+";");
+      }
+      line(0, "}");
+    } finally {
+      out.close();
+    }
+  }
+
+  private void startFile(String name, String space) throws IOException {
+    File dir = new File(dest, space.replace('.', File.separatorChar));
+    if (!dir.exists())
+      if (!dir.mkdirs())
+        throw new IOException("Unable to create " + dir);
+    name = cap(name) + ".java";
+    out = new OutputStreamWriter(new FileOutputStream(new File(dir, name)));
+    header(space);
+  }
+
+  private void header(String namespace) throws IOException {
+    if(namespace != null) {
+      line(0, "package "+namespace+";\n");
+    }
+    line(0, "import java.nio.ByteBuffer;");
+    line(0, "import java.util.Map;");
+    line(0, "import java.util.HashMap;");
+    line(0, "import org.apache.avro.Protocol;");
+    line(0, "import org.apache.avro.Schema;");
+    line(0, "import org.apache.avro.AvroRuntimeException;");
+    line(0, "import org.apache.avro.Protocol;");
+    line(0, "import org.apache.avro.util.Utf8;");
+    line(0, "import org.apache.avro.ipc.AvroRemoteException;");
+    line(0, "import org.apache.avro.generic.GenericArray;");
+    line(0, "import org.apache.avro.specific.SpecificExceptionBase;");
+    line(0, "import org.apache.avro.specific.SpecificRecordBase;");
+    line(0, "import org.apache.avro.specific.SpecificRecord;");
+    line(0, "import org.apache.avro.specific.SpecificFixed;");
+    line(0, "import org.apache.gora.persistency.StateManager;");
+    line(0, "import org.apache.gora.persistency.impl.PersistentBase;");
+    line(0, "import org.apache.gora.persistency.impl.StateManagerImpl;");
+    line(0, "import org.apache.gora.persistency.StatefulHashMap;");
+    line(0, "import org.apache.gora.persistency.ListGenericArray;");
+    for (Schema s : queue)
+      if (namespace == null
+          ? (s.getNamespace() != null)
+          : !namespace.equals(s.getNamespace()))
+        line(0, "import "+SpecificData.get().getClassName(s)+";");
+    line(0, "");
+    line(0, "@SuppressWarnings(\"all\")");
+  }
+
+  private String params(Schema request) throws IOException {
+    StringBuilder b = new StringBuilder();
+    int count = 0;
+    for (Field field : request.getFields()) {
+      b.append(unbox(field.schema()));
+      b.append(" ");
+      b.append(field.name());
+      if (++count < request.getFields().size())
+        b.append(", ");
+    }
+    return b.toString();
+  }
+
+  private String errors(Schema errs) throws IOException {
+    StringBuilder b = new StringBuilder();
+    for (Schema error : errs.getTypes().subList(1, errs.getTypes().size())) {
+      b.append(", ");
+      b.append(error.getName());
+    }
+    return b.toString();
+  }
+
+  private void compile(Schema schema) throws IOException {
+    startFile(schema.getName(), schema.getNamespace());
+    try {
+      switch (schema.getType()) {
+      case RECORD:
+        String type = type(schema);
+        line(0, "public class "+ type
+             +" extends PersistentBase {");
+        // schema definition
+        line(1, "public static final Schema _SCHEMA = Schema.parse(\""
+             +esc(schema)+"\");");
+
+        //field information
+        line(1, "public static enum Field {");
+        int i=0;
+        for (Field field : schema.getFields()) {
+          line(2,toUpperCase(field.name())+"("+(i++)+ ",\"" + field.name() + "\"),");
+        }
+        line(2, ";");
+        line(2, "private int index;");
+        line(2, "private String name;");
+        line(2, "Field(int index, String name) {this.index=index;this.name=name;}");
+        line(2, "public int getIndex() {return index;}");
+        line(2, "public String getName() {return name;}");
+        line(2, "public String toString() {return name;}");
+        line(1, "};");
+
+        StringBuilder builder = new StringBuilder(
+        "public static final String[] _ALL_FIELDS = {");
+        for (Field field : schema.getFields()) {
+          builder.append("\"").append(field.name()).append("\",");
+        }
+        builder.append("};");
+        line(1, builder.toString());
+
+        line(1, "static {");
+        line(2, "PersistentBase.registerFields("+type+".class, _ALL_FIELDS);");
+        line(1, "}");
+
+        // field declations
+        for (Field field : schema.getFields()) {
+          line(1,"private "+unbox(field.schema())+" "+field.name()+";");
+        }
+
+        //constructors
+        line(1, "public " + type + "() {");
+        line(2, "this(new StateManagerImpl());");
+        line(1, "}");
+        line(1, "public " + type + "(StateManager stateManager) {");
+        line(2, "super(stateManager);");
+        for (Field field : schema.getFields()) {
+          Schema fieldSchema = field.schema();
+          switch (fieldSchema.getType()) {
+          case ARRAY:
+            String valueType = type(fieldSchema.getElementType());
+            line(2, field.name()+" = new ListGenericArray<"+valueType+">(getSchema()" +
+                ".getField(\""+field.name()+"\").schema());");
+            break;
+          case MAP:
+            valueType = type(fieldSchema.getValueType());
+            line(2, field.name()+" = new StatefulHashMap<Utf8,"+valueType+">();");
+          }
+        }
+        line(1, "}");
+
+        //newInstance(StateManager)
+        line(1, "public " + type + " newInstance(StateManager stateManager) {");
+        line(2, "return new " + type + "(stateManager);" );
+        line(1, "}");
+
+        // schema method
+        line(1, "public Schema getSchema() { return _SCHEMA; }");
+        // get method
+        line(1, "public Object get(int _field) {");
+        line(2, "switch (_field) {");
+        i = 0;
+        for (Field field : schema.getFields()) {
+          line(2, "case "+(i++)+": return "+field.name()+";");
+        }
+        line(2, "default: throw new AvroRuntimeException(\"Bad index\");");
+        line(2, "}");
+        line(1, "}");
+        // put method
+        line(1, "@SuppressWarnings(value=\"unchecked\")");
+        line(1, "public void put(int _field, Object _value) {");
+        line(2, "if(isFieldEqual(_field, _value)) return;");
+        line(2, "getStateManager().setDirty(this, _field);");
+        line(2, "switch (_field) {");
+        i = 0;
+        for (Field field : schema.getFields()) {
+          line(2, "case "+i+":"+field.name()+" = ("+
+               type(field.schema())+")_value; break;");
+          i++;
+        }
+        line(2, "default: throw new AvroRuntimeException(\"Bad index\");");
+        line(2, "}");
+        line(1, "}");
+
+        // java bean style getters and setters
+        i = 0;
+        for (Field field : schema.getFields()) {
+          String camelKey = camelCasify(field.name());
+          Schema fieldSchema = field.schema();
+          switch (fieldSchema.getType()) {
+          case INT:case LONG:case FLOAT:case DOUBLE:
+          case BOOLEAN:case BYTES:case STRING: case ENUM: case RECORD:
+            String unboxed = unbox(fieldSchema);
+            line(1, "public "+unboxed+" get" +camelKey+"() {");
+            line(2, "return ("+type(field.schema())+") get("+i+");");
+            line(1, "}");
+            line(1, "public void set"+camelKey+"("+unboxed+" value) {");
+            line(2, "put("+i+", value);");
+            line(1, "}");
+            break;
+          case ARRAY:
+            unboxed = unbox(fieldSchema.getElementType());
+
+            line(1, "public GenericArray<"+unboxed+"> get"+camelKey+"() {");
+            line(2, "return (GenericArray<"+unboxed+">) get("+i+");");
+            line(1, "}");
+            line(1, "public void addTo"+camelKey+"("+unboxed+" element) {");
+            line(2, "getStateManager().setDirty(this, "+i+");");
+            line(2, field.name()+".add(element);");
+            line(1, "}");
+            break;
+          case MAP:
+            unboxed = unbox(fieldSchema.getValueType());
+            line(1, "public Map<Utf8, "+unboxed+"> get"+camelKey+"() {");
+            line(2, "return (Map<Utf8, "+unboxed+">) get("+i+");");
+            line(1, "}");
+            line(1, "public "+unboxed+" getFrom"+camelKey+"(Utf8 key) {");
+            line(2, "if ("+field.name()+" == null) { return null; }");
+            line(2, "return "+field.name()+".get(key);");
+            line(1, "}");
+            line(1, "public void putTo"+camelKey+"(Utf8 key, "+unboxed+" value) {");
+            line(2, "getStateManager().setDirty(this, "+i+");");
+            line(2, field.name()+".put(key, value);");
+            line(1, "}");
+            line(1, "public "+unboxed+" removeFrom"+camelKey+"(Utf8 key) {");
+            line(2, "if ("+field.name()+" == null) { return null; }");
+            line(2, "getStateManager().setDirty(this, "+i+");");
+            line(2, "return "+field.name()+".remove(key);");
+            line(1, "}");
+          }
+          i++;
+        }
+        line(0, "}");
+
+        break;
+      case ENUM:
+        line(0, "public enum "+type(schema)+" { ");
+        StringBuilder b = new StringBuilder();
+        int count = 0;
+        for (String symbol : schema.getEnumSymbols()) {
+          b.append(symbol);
+          if (++count < schema.getEnumSymbols().size())
+            b.append(", ");
+        }
+        line(1, b.toString());
+        line(0, "}");
+        break;
+      case FIXED:
+        line(0, "@FixedSize("+schema.getFixedSize()+")");
+        line(0, "public class "+type(schema)+" extends SpecificFixed {}");
+        break;
+      case MAP: case ARRAY: case UNION: case STRING: case BYTES:
+      case INT: case LONG: case FLOAT: case DOUBLE: case BOOLEAN: case NULL:
+        break;
+      default: throw new RuntimeException("Unknown type: "+schema);
+      }
+    } finally {
+      out.close();
+    }
+  }
+
+  private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
+
+  public static String type(Schema schema) {
+    switch (schema.getType()) {
+    case RECORD:
+    case ENUM:
+    case FIXED:
+      return schema.getName();
+    case ARRAY:
+      return "GenericArray<"+type(schema.getElementType())+">";
+    case MAP:
+      return "Map<Utf8,"+type(schema.getValueType())+">";
+    case UNION:
+      List<Schema> types = schema.getTypes();     // elide unions with null
+      if ((types.size() == 2) && types.contains(NULL_SCHEMA))
+        return type(types.get(types.get(0).equals(NULL_SCHEMA) ? 1 : 0));
+      return "Object";
+    case STRING:  return "Utf8";
+    case BYTES:   return "ByteBuffer";
+    case INT:     return "Integer";
+    case LONG:    return "Long";
+    case FLOAT:   return "Float";
+    case DOUBLE:  return "Double";
+    case BOOLEAN: return "Boolean";
+    case NULL:    return "Void";
+    default: throw new RuntimeException("Unknown type: "+schema);
+    }
+  }
+
+  public static String unbox(Schema schema) {
+    switch (schema.getType()) {
+    case INT:     return "int";
+    case LONG:    return "long";
+    case FLOAT:   return "float";
+    case DOUBLE:  return "double";
+    case BOOLEAN: return "boolean";
+    default:      return type(schema);
+    }
+  }
+
+  private void line(int indent, String text) throws IOException {
+    for (int i = 0; i < indent; i ++) {
+      out.append("  ");
+    }
+    out.append(text);
+    out.append("\n");
+  }
+
+  static String cap(String name) {
+    return name.substring(0,1).toUpperCase()+name.substring(1,name.length());
+  }
+
+  private static String esc(Object o) {
+    return o.toString().replace("\"", "\\\"");
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("Usage: SpecificCompiler <schema file> <output dir>");
+      System.exit(1);
+    }
+    compileSchema(new File(args[0]), new File(args[1]));
+  }
+
+}
+

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/FakeResolvingDecoder.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/FakeResolvingDecoder.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/FakeResolvingDecoder.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/FakeResolvingDecoder.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.mapreduce;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.ResolvingDecoder;
+import org.apache.avro.io.parsing.Symbol;
+import org.apache.avro.util.Utf8;
+
+/**
+ * Avro uses a ResolvingDecoder which resolves two schemas and converts records 
+ * written by one to the other, and validates the input. However, gora needs to 
+ * write extra information along with the data, so the validation is not consistent 
+ * with the grammer generated by Avro. So we need to fake the ResolvingDecoder (which
+ * is sadly hard codec into GenericDatumReader) until we can write our own GrammerGenerator
+ * extending ResolvingGrammerGenerator of avro.
+ */
+public class FakeResolvingDecoder extends ResolvingDecoder {
+
+  public FakeResolvingDecoder(Schema schema, Decoder in) throws IOException {
+    super(schema, schema, in);
+  }
+  
+  @Override
+  public long arrayNext() throws IOException {
+    return in.arrayNext();
+  }
+  
+  @Override
+  public Symbol doAction(Symbol input, Symbol top) throws IOException {
+    return null;
+  }
+  
+  @Override
+  public void init(InputStream in) throws IOException {
+    this.in.init(in);
+  }
+  
+  @Override
+  public long mapNext() throws IOException {
+    return in.mapNext();
+  }
+
+  @Override
+  public double readDouble() throws IOException {
+    return in.readDouble();
+  }
+
+  @Override
+  public int readEnum() throws IOException {
+    return in.readEnum();
+  }
+
+  @Override
+  public int readIndex() throws IOException {
+    return in.readIndex();
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    return in.readLong();
+  }
+
+  @Override
+  public void skipAction() throws IOException {
+  }
+
+  @Override
+  public long readArrayStart() throws IOException {
+    return in.readArrayStart();
+  }
+
+  @Override
+  public boolean readBoolean() throws IOException {
+    return in.readBoolean();
+  }
+
+  @Override
+  public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+    return in.readBytes(old);
+  }
+
+  @Override
+  public void readFixed(byte[] bytes, int start, int len) throws IOException {
+    in.readFixed(bytes, start, len);
+  }
+
+  @Override
+  public float readFloat() throws IOException {
+    return in.readFloat();
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    return in.readInt();
+  }
+
+  @Override
+  public long readMapStart() throws IOException {
+    return in.readMapStart();
+  }
+
+  @Override
+  public void readNull() throws IOException {
+    in.readNull();
+  }
+
+  @Override
+  public Utf8 readString(Utf8 old) throws IOException {
+    return in.readString(old);
+  }
+
+  @Override
+  public long skipArray() throws IOException {
+    return in.skipArray();
+  }
+
+  @Override
+  public void skipBytes() throws IOException {
+    in.skipBytes();
+  }
+
+  @Override
+  protected void skipFixed() throws IOException {
+  }
+
+  @Override
+  public void skipFixed(int length) throws IOException {
+    in.skipFixed(length);
+  }
+
+  @Override
+  public long skipMap() throws IOException {
+    return in.skipMap();
+  }
+
+  @Override
+  public void skipString() throws IOException {
+  }
+
+  @Override
+  public void skipTopSymbol() throws IOException {
+  }
+
+  @Override
+  public void readFixed(byte[] bytes) throws IOException {
+    in.readFixed(bytes);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraInputFormat.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraInputFormat.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraInputFormat.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.impl.FileSplitPartitionQuery;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.FileBackedDataStore;
+import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * {@link InputFormat} to fetch the input from gora data stores. The
+ * query to fetch the items from the datastore should be prepared and
+ * set via {@link #setQuery(Job, Query)}, before submitting the job.
+ *
+ * <p> The {@link InputSplit}s are prepared from the {@link PartitionQuery}s
+ * obtained by calling {@link DataStore#getPartitions(Query)}.
+ */
+public class GoraInputFormat<K, T extends Persistent>
+  extends InputFormat<K, T> implements Configurable {
+
+  public static final String QUERY_KEY   = "gora.inputformat.query";
+
+  private DataStore<K, T> dataStore;
+
+  private Configuration conf;
+
+  private Query<K, T> query;
+
+  @SuppressWarnings({ "rawtypes" })
+  private void setInputPath(PartitionQuery<K,T> partitionQuery
+      , TaskAttemptContext context) throws IOException {
+    //if the data store is file based
+    if(partitionQuery instanceof FileSplitPartitionQuery) {
+      FileSplit split = ((FileSplitPartitionQuery<K,T>)partitionQuery).getSplit();
+      //set the input path to FileSplit's path.
+      ((FileBackedDataStore)partitionQuery.getDataStore()).setInputPath(
+          split.getPath().toString());
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordReader<K, T> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    PartitionQuery<K,T> partitionQuery = (PartitionQuery<K, T>)
+      ((GoraInputSplit)split).getQuery();
+
+    setInputPath(partitionQuery, context);
+    return new GoraRecordReader<K, T>(partitionQuery);
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException,
+      InterruptedException {
+
+    List<PartitionQuery<K, T>> queries = dataStore.getPartitions(query);
+    List<InputSplit> splits = new ArrayList<InputSplit>(queries.size());
+
+    for(PartitionQuery<K,T> query : queries) {
+      splits.add(new GoraInputSplit(context.getConfiguration(), query));
+    }
+
+    return splits;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    try {
+      this.query = getQuery(conf);
+      this.dataStore = query.getDataStore();
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  public static<K, T extends Persistent> void setQuery(Job job
+      , Query<K, T> query) throws IOException {
+    IOUtils.storeToConf(query, job.getConfiguration(), QUERY_KEY);
+  }
+
+  public Query<K, T> getQuery(Configuration conf) throws IOException {
+    return IOUtils.loadFromConf(conf, QUERY_KEY);
+  }
+
+  /**
+   * Sets the input parameters for the job
+   * @param job the job to set the properties for
+   * @param query the query to get the inputs from
+   * @param reuseObjects whether to reuse objects in serialization
+   * @throws IOException
+   */
+  public static <K1, V1 extends Persistent> void setInput(Job job
+      , Query<K1,V1> query, boolean reuseObjects) throws IOException {
+    setInput(job, query, query.getDataStore(), reuseObjects);
+  }
+
+  /**
+   * Sets the input parameters for the job
+   * @param job the job to set the properties for
+   * @param query the query to get the inputs from
+   * @param dataStore the datastore as the input
+   * @param reuseObjects whether to reuse objects in serialization
+   * @throws IOException
+   */
+  public static <K1, V1 extends Persistent> void setInput(Job job
+      , Query<K1,V1> query, DataStore<K1,V1> dataStore, boolean reuseObjects)
+  throws IOException {
+
+    Configuration conf = job.getConfiguration();
+
+    GoraMapReduceUtils.setIOSerializations(conf, reuseObjects);
+
+    job.setInputFormatClass(GoraInputFormat.class);
+    GoraInputFormat.setQuery(job, query);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraInputSplit.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraInputSplit.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraInputSplit.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraInputSplit.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * InputSplit using {@link PartitionQuery}s. 
+ */
+public class GoraInputSplit extends InputSplit 
+  implements Writable, Configurable {
+
+  protected PartitionQuery<?,?> query;
+  private Configuration conf;
+  
+  public GoraInputSplit() {
+  }
+  
+  public GoraInputSplit(Configuration conf, PartitionQuery<?,?> query) {
+    setConf(conf);
+    this.query = query;
+  }
+  
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    return 0;
+  }
+
+  @Override
+  public String[] getLocations() {
+    return query.getLocations();
+  }
+
+  public PartitionQuery<?, ?> getQuery() {
+    return query;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    try {
+      query = (PartitionQuery<?, ?>) IOUtils.deserialize(conf, in, null);
+    } catch (ClassNotFoundException ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    IOUtils.serialize(getConf(), out, query);
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if(obj instanceof GoraInputSplit) {
+      return this.query.equals(((GoraInputSplit)obj).query);
+    }
+    return false;
+  }
+}
\ No newline at end of file

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraMapReduceUtils.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraMapReduceUtils.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraMapReduceUtils.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraMapReduceUtils.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gora.util.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * MapReduce related utilities for Gora
+ */
+public class GoraMapReduceUtils {
+
+  public static class HelperInputFormat<K,V> extends FileInputFormat<K, V> {
+    @Override
+    public RecordReader<K, V> createRecordReader(InputSplit arg0,
+        TaskAttemptContext arg1) throws IOException, InterruptedException {
+      return null;
+    }
+  }
+  
+  public static void setIOSerializations(Configuration conf, boolean reuseObjects) {
+    String serializationClass =
+      PersistentSerialization.class.getCanonicalName();
+    if (!reuseObjects) {
+      serializationClass =
+        PersistentNonReusingSerialization.class.getCanonicalName();
+    }
+    String[] serializations = StringUtils.joinStringArrays(
+        conf.getStrings("io.serializations"), 
+        "org.apache.hadoop.io.serializer.WritableSerialization",
+        StringSerialization.class.getCanonicalName(),
+        serializationClass); 
+    conf.setStrings("io.serializations", serializations);
+  }  
+  
+  public static List<InputSplit> getSplits(Configuration conf, String inputPath) 
+    throws IOException {
+    JobContext context = createJobContext(conf, inputPath);
+    
+    HelperInputFormat<?,?> inputFormat = new HelperInputFormat<Object,Object>();
+    return inputFormat.getSplits(context);
+  }
+  
+  public static JobContext createJobContext(Configuration conf, String inputPath) 
+    throws IOException {
+    
+    if(inputPath != null) {
+      Job job = new Job(conf);
+      FileInputFormat.addInputPath(job, new Path(inputPath));
+      return new JobContext(job.getConfiguration(), null);
+    } 
+    
+    return new JobContext(conf, null);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraMapper.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraMapper.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraMapper.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraMapper.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * Optional base class for gora based {@link Mapper}s.
+ */
+public class GoraMapper<K1, V1 extends Persistent, K2, V2>
+extends Mapper<K1, V1, K2, V2> {
+
+  @SuppressWarnings("rawtypes")
+  public static <K1, V1 extends Persistent, K2, V2>
+  void initMapperJob(Job job, Query<K1,V1> query,
+      DataStore<K1,V1> dataStore,
+      Class<K2> outKeyClass, Class<V2> outValueClass,
+      Class<? extends GoraMapper> mapperClass,
+      Class<? extends Partitioner> partitionerClass, boolean reuseObjects)
+  throws IOException {
+    //set the input via GoraInputFormat
+    GoraInputFormat.setInput(job, query, dataStore, reuseObjects);
+
+    job.setMapperClass(mapperClass);
+    job.setMapOutputKeyClass(outKeyClass);
+    job.setMapOutputValueClass(outValueClass);
+
+    if (partitionerClass != null) {
+      job.setPartitionerClass(partitionerClass);
+    }
+  }
+
+  @SuppressWarnings({ "rawtypes" })
+  public static <K1, V1 extends Persistent, K2, V2>
+  void initMapperJob(Job job, Query<K1,V1> query, DataStore<K1,V1> dataStore,
+      Class<K2> outKeyClass, Class<V2> outValueClass,
+      Class<? extends GoraMapper> mapperClass, boolean reuseObjects)
+  throws IOException {
+
+    initMapperJob(job, query, dataStore, outKeyClass, outValueClass,
+        mapperClass, null, reuseObjects);
+  }
+
+
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraOutputFormat.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraOutputFormat.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraOutputFormat.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.store.FileBackedDataStore;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class GoraOutputFormat<K, T extends Persistent>
+  extends OutputFormat<K, T> {
+
+  public static final String DATA_STORE_CLASS = "gora.outputformat.datastore.class";
+
+  public static final String OUTPUT_KEY_CLASS   = "gora.outputformat.key.class";
+
+  public static final String OUTPUT_VALUE_CLASS = "gora.outputformat.value.class";
+
+  @Override
+  public void checkOutputSpecs(JobContext context)
+  throws IOException, InterruptedException { }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+  throws IOException, InterruptedException {
+    return new NullOutputCommitter();
+  }
+
+  private void setOutputPath(DataStore<K,T> store, TaskAttemptContext context) {
+    if(store instanceof FileBackedDataStore) {
+      FileBackedDataStore<K, T> fileStore = (FileBackedDataStore<K, T>) store;
+      String uniqueName = FileOutputFormat.getUniqueFile(context, "part", "");
+
+      //if file store output is not set, then get the output from FileOutputFormat
+      if(fileStore.getOutputPath() == null) {
+        fileStore.setOutputPath(FileOutputFormat.getOutputPath(context).toString());
+      }
+
+      //set the unique name of the data file
+      String path = fileStore.getOutputPath();
+      fileStore.setOutputPath( path + Path.SEPARATOR  + uniqueName);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordWriter<K, T> getRecordWriter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    Class<? extends DataStore<K,T>> dataStoreClass
+      = (Class<? extends DataStore<K,T>>) conf.getClass(DATA_STORE_CLASS, null);
+    Class<K> keyClass = (Class<K>) conf.getClass(OUTPUT_KEY_CLASS, null);
+    Class<T> rowClass = (Class<T>) conf.getClass(OUTPUT_VALUE_CLASS, null);
+    final DataStore<K, T> store =
+      DataStoreFactory.createDataStore(dataStoreClass, keyClass, rowClass);
+
+    setOutputPath(store, context);
+
+    return new RecordWriter<K, T>() {
+      @Override
+      public void close(TaskAttemptContext context) throws IOException,
+          InterruptedException {
+        store.close();
+      }
+
+      @Override
+      public void write(K key, T value)
+      throws IOException, InterruptedException {
+        store.put(key, value);
+      }
+    };
+  }
+
+  /**
+   * Sets the output parameters for the job
+   * @param job the job to set the properties for
+   * @param dataStore the datastore as the output
+   * @param reuseObjects whether to reuse objects in serialization
+   */
+  public static <K, V extends Persistent> void setOutput(Job job,
+      DataStore<K,V> dataStore, boolean reuseObjects) {
+    setOutput(job, dataStore.getKeyClass(), dataStore.getPersistentClass(),
+        dataStore.getClass(), reuseObjects);
+  }
+
+  @SuppressWarnings("rawtypes")
+  public static <K, V extends Persistent> void setOutput(Job job,
+      Class<K> keyClass, Class<V> persistentClass,
+      Class<? extends DataStore> dataStoreClass,
+      boolean reuseObjects) {
+
+    Configuration conf = job.getConfiguration();
+
+    GoraMapReduceUtils.setIOSerializations(conf, reuseObjects);
+
+    job.setOutputFormatClass(GoraOutputFormat.class);
+    conf.setClass(GoraOutputFormat.DATA_STORE_CLASS, dataStoreClass,
+        DataStore.class);
+    conf.setClass(GoraOutputFormat.OUTPUT_KEY_CLASS, keyClass, Object.class);
+    conf.setClass(GoraOutputFormat.OUTPUT_VALUE_CLASS,
+        persistentClass, Persistent.class);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordReader.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * An adapter for Result to Hadoop RecordReader.
+ */
+public class GoraRecordReader<K, T extends Persistent> 
+extends RecordReader<K,T> {
+
+  protected Query<K,T> query;
+  protected Result<K,T> result;
+  
+  public GoraRecordReader(Query<K,T> query) {
+    this.query = query;
+  }
+
+  public void executeQuery() throws IOException {
+    this.result = query.execute();
+  }
+  
+  @Override
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return result.getKey();
+  }
+
+  @Override
+  public T getCurrentValue() throws IOException, InterruptedException {
+    return result.get();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return result.getProgress();
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+  throws IOException, InterruptedException { }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if(this.result == null) {
+      executeQuery();
+    }
+    
+    return result.next();
+  }
+
+  @Override
+  public void close() throws IOException {
+    result.close();
+  }
+
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraReducer.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraReducer.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraReducer.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/GoraReducer.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mapreduce;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Optional base class for gora based {@link Reducer}s.
+ */
+public class GoraReducer<K1, V1, K2, V2 extends Persistent>
+extends Reducer<K1, V1, K2, V2> {
+
+  public static <K1, V1, K2, V2 extends Persistent>
+  void initReducerJob(Job job, DataStore<K2,V2> dataStore,
+      Class<? extends GoraReducer<K1, V1, K2, V2>> reducerClass) {
+    initReducerJob(job, dataStore, reducerClass, true);
+  }
+
+  public static <K1, V1, K2, V2 extends Persistent>
+  void initReducerJob(Job job, DataStore<K2,V2> dataStore,
+      Class<? extends GoraReducer<K1, V1, K2, V2>> reducerClass,
+          boolean reuseObjects) {
+    
+    GoraOutputFormat.setOutput(job, dataStore, reuseObjects);
+    
+    job.setReducerClass(reducerClass);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/NullOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/NullOutputCommitter.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/NullOutputCommitter.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/NullOutputCommitter.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * An OutputCommitter that does nothing.
+ */
+public class NullOutputCommitter extends OutputCommitter {
+
+  @Override
+  public void abortTask(TaskAttemptContext arg0) throws IOException {
+  }
+
+  @Override
+  public void cleanupJob(JobContext arg0) throws IOException {
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext arg0) throws IOException {
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void setupJob(JobContext arg0) throws IOException {
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext arg0) throws IOException {
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentDeserializer.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentDeserializer.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentDeserializer.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentDeserializer.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mapreduce;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.gora.avro.PersistentDatumReader;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.util.AvroUtils;
+import org.apache.hadoop.io.serializer.Deserializer;
+
+/**
+* Hadoop deserializer using {@link PersistentDatumReader}
+* with {@link BinaryDecoder}.
+*/
+public class PersistentDeserializer
+   implements Deserializer<Persistent> {
+
+  private BinaryDecoder decoder;
+  private Class<? extends Persistent> persistentClass;
+  private boolean reuseObjects;
+  private PersistentDatumReader<Persistent> datumReader;
+
+  public PersistentDeserializer(Class<? extends Persistent> c, boolean reuseObjects) {
+    this.persistentClass = c;
+    this.reuseObjects = reuseObjects;
+    try {
+      Schema schema = AvroUtils.getSchema(persistentClass);
+      datumReader = new PersistentDatumReader<Persistent>(schema, true);
+
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public void open(InputStream in) throws IOException {
+    /* It is very important to use a direct buffer, since Hadoop
+     * supplies an input stream that is only valid until the end of one
+     * record serialization. Each time deserialize() is called, the IS
+     * is advanced to point to the right location, so we should not
+     * buffer the whole input stream at once.
+     */
+    decoder = new DecoderFactory().configureDirectDecoder(true)
+      .createBinaryDecoder(in, decoder);
+  }
+
+  @Override
+  public void close() throws IOException { }
+
+  @Override
+  public Persistent deserialize(Persistent persistent) throws IOException {
+    return datumReader.read(reuseObjects ? persistent : null, decoder);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentNonReusingSerialization.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentNonReusingSerialization.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentNonReusingSerialization.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentNonReusingSerialization.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mapreduce;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+
+public class PersistentNonReusingSerialization
+implements Serialization<Persistent> {
+
+  @Override
+  public boolean accept(Class<?> c) {
+    return Persistent.class.isAssignableFrom(c);
+  }
+
+  @Override
+  public Deserializer<Persistent> getDeserializer(Class<Persistent> c) {
+    return new PersistentDeserializer(c, false);
+  }
+
+  @Override
+  public Serializer<Persistent> getSerializer(Class<Persistent> c) {
+    return new PersistentSerializer();
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerialization.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerialization.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerialization.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerialization.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mapreduce;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+
+public class PersistentSerialization
+implements Serialization<Persistent> {
+
+  @Override
+  public boolean accept(Class<?> c) {
+    return Persistent.class.isAssignableFrom(c);
+  }
+
+  @Override
+  public Deserializer<Persistent> getDeserializer(Class<Persistent> c) {
+    return new PersistentDeserializer(c, true);
+  }
+
+  @Override
+  public Serializer<Persistent> getSerializer(Class<Persistent> c) {
+    return new PersistentSerializer();
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerializer.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerializer.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerializer.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/PersistentSerializer.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mapreduce;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.gora.avro.PersistentDatumWriter;
+import org.apache.gora.persistency.Persistent;
+import org.apache.hadoop.io.serializer.Serializer;
+
+/**
+ * Hadoop serializer using {@link PersistentDatumWriter} 
+ * with {@link BinaryEncoder}. 
+ */
+public class PersistentSerializer implements Serializer<Persistent> {
+
+  private PersistentDatumWriter<Persistent> datumWriter;
+  private BinaryEncoder encoder;  
+  
+  public PersistentSerializer() {
+    this.datumWriter = new PersistentDatumWriter<Persistent>();
+  }
+  
+  @Override
+  public void close() throws IOException {
+    encoder.flush();
+  }
+
+  @Override
+  public void open(OutputStream out) throws IOException {
+    encoder = new BinaryEncoder(out);
+  }
+
+  @Override
+  public void serialize(Persistent persistent) throws IOException {   
+    datumWriter.setSchema(persistent.getSchema());
+    datumWriter.setPersistent(persistent);
+        
+    datumWriter.write(persistent, encoder);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/StringComparator.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/StringComparator.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/StringComparator.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/StringComparator.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mapreduce;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+
+public class StringComparator implements RawComparator<String> {
+
+  @Override
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+    return Text.Comparator.compareBytes(b1, s1, l1, b2, s2, l2);
+  }
+
+  @Override
+  public int compare(String o1, String o2) {
+    return o1.compareTo(o2);
+  }
+
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/StringSerialization.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/StringSerialization.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/StringSerialization.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/mapreduce/StringSerialization.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mapreduce;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+
+public class StringSerialization implements Serialization<String> {
+
+  @Override
+  public boolean accept(Class<?> c) {
+    return c.equals(String.class);
+  }
+
+  @Override
+  public Deserializer<String> getDeserializer(Class<String> c) {
+    return new Deserializer<String>() {
+      private DataInputStream in;
+
+      @Override
+      public void open(InputStream in) throws IOException {
+        this.in = new DataInputStream(in);
+      }
+
+      @Override
+      public void close() throws IOException {
+        this.in.close();
+      }
+
+      @Override
+      public String deserialize(String t) throws IOException {
+        return Text.readString(in);
+      }
+    };
+  }
+
+  @Override
+  public Serializer<String> getSerializer(Class<String> c) {
+    return new Serializer<String>() {
+
+      private DataOutputStream out;
+
+      @Override
+      public void close() throws IOException {
+        this.out.close();
+      }
+
+      @Override
+      public void open(OutputStream out) throws IOException {
+        this.out = new DataOutputStream(out);
+      }
+
+      @Override
+      public void serialize(String str) throws IOException {
+        Text.writeString(out, str);
+      }
+    };
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/memory/store/MemStore.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.memory.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.StateManagerImpl;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.query.impl.QueryBase;
+import org.apache.gora.query.impl.ResultBase;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.impl.DataStoreBase;
+
+/**
+ * Memory based {@link DataStore} implementation for tests.
+ */
+public class MemStore<K, T extends Persistent> extends DataStoreBase<K, T> {
+
+  public static class MemQuery<K, T extends Persistent> extends QueryBase<K, T> {
+    public MemQuery() {
+      super(null);
+    }
+    public MemQuery(DataStore<K, T> dataStore) {
+      super(dataStore);
+    }
+  }
+
+  public static class MemResult<K, T extends Persistent> extends ResultBase<K, T> {
+    private NavigableMap<K, T> map;
+    private Iterator<K> iterator;
+    public MemResult(DataStore<K, T> dataStore, Query<K, T> query
+        , NavigableMap<K, T> map) {
+      super(dataStore, query);
+      this.map = map;
+      iterator = map.navigableKeySet().iterator();
+    }
+    @Override
+    public void close() throws IOException { }
+    @Override
+    public float getProgress() throws IOException {
+      return 0;
+    }
+
+    @Override
+    protected void clear() {  } //do not clear the object in the store
+
+    @Override
+    public boolean nextInner() throws IOException {
+      if(!iterator.hasNext()) {
+        return false;
+      }
+
+      key = iterator.next();
+      persistent = map.get(key);
+
+      return true;
+    }
+  }
+
+  private TreeMap<K, T> map = new TreeMap<K, T>();
+
+  @Override
+  public String getSchemaName() {
+    return "default";
+  }
+
+  @Override
+  public boolean delete(K key) throws IOException {
+    return map.remove(key) != null;
+  }
+
+  @Override
+  public long deleteByQuery(Query<K, T> query) throws IOException {
+    long deletedRows = 0;
+    Result<K,T> result = query.execute();
+
+    while(result.next()) {
+      if(delete(result.getKey()))
+        deletedRows++;
+    }
+
+    return 0;
+  }
+
+  @Override
+  public Result<K, T> execute(Query<K, T> query) throws IOException {
+    K startKey = query.getStartKey();
+    K endKey = query.getEndKey();
+    if(startKey == null) {
+      startKey = map.firstKey();
+    }
+    if(endKey == null) {
+      endKey = map.lastKey();
+    }
+
+    //check if query.fields is null
+    query.setFields(getFieldsToQuery(query.getFields()));
+
+    NavigableMap<K, T> submap = map.subMap(startKey, true, endKey, true);
+
+    return new MemResult<K,T>(this, query, submap);
+  }
+
+  @Override
+  public T get(K key, String[] fields) throws IOException {
+    T obj = map.get(key);
+    return getPersistent(obj, getFieldsToQuery(fields));
+  }
+
+  /**
+   * Returns a clone with exactly the requested fields shallowly copied
+   */
+  @SuppressWarnings("unchecked")
+  private static<T extends Persistent> T getPersistent(T obj, String[] fields) {
+    if(Arrays.equals(fields, obj.getFields())) {
+      return obj;
+    }
+    T newObj = (T) obj.newInstance(new StateManagerImpl());
+    for(String field:fields) {
+      int index = newObj.getFieldIndex(field);
+      newObj.put(index, obj.get(index));
+    }
+    return newObj;
+  }
+
+  @Override
+  public Query<K, T> newQuery() {
+    return new MemQuery<K, T>(this);
+  }
+
+  @Override
+  public void put(K key, T obj) throws IOException {
+    map.put(key, obj);
+  }
+
+  @Override
+  /**
+   * Returns a single partition containing the original query
+   */
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
+      throws IOException {
+    List<PartitionQuery<K, T>> list = new ArrayList<PartitionQuery<K,T>>();
+    list.add(new PartitionQueryImpl<K, T>(query));
+    return list;
+  }
+
+  @Override
+  public void close() throws IOException {
+    map.clear();
+  }
+
+  @Override
+  public void createSchema() throws IOException { }
+
+  @Override
+  public void deleteSchema() throws IOException {
+    map.clear();
+  }
+
+  @Override
+  public boolean schemaExists() throws IOException {
+    return true;
+  }
+
+  @Override
+  public void flush() throws IOException { }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/BeanFactory.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/BeanFactory.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/BeanFactory.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/persistency/BeanFactory.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.persistency;
+
+/**
+ * BeanFactory's enable contruction of keys and Persistent objects. 
+ */
+public interface BeanFactory<K, T extends Persistent> {
+
+  /**
+   * Constructs a new instance of the key class
+   * @return a new instance of the key class
+   */
+  public abstract K newKey() throws Exception;
+
+  /**
+   * Constructs a new instance of the Persistent class
+   * @return a new instance of the Peristent class
+   */
+  public abstract T newPersistent();
+
+  /**
+   * Returns an instance of the key object to be 
+   * used to access static fields of the object. Returned object MUST  
+   * be treated as read-only. No fields other than the static fields 
+   * of the object should be assumed to be readable. 
+   * @return a cached instance of the key object
+   */
+  public abstract K getCachedKey();
+  
+  /**
+   * Returns an instance of the {@link Persistent} object to be 
+   * used to access static fields of the object. Returned object MUST  
+   * be treated as read-only. No fields other than the static fields 
+   * of the object should be assumed to be readable. 
+   * @return a cached instance of the Persistent object
+   */
+  public abstract T getCachedPersistent();
+
+  /**
+   * Returns the key class
+   * @return the key class
+   */
+  public abstract Class<K> getKeyClass();
+
+  /**
+   * Returns the persistent class
+   * @return the persistent class
+   */
+  public abstract Class<T> getPersistentClass();
+
+}
\ No newline at end of file



Mime
View raw message