incubator-gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject svn commit: r1021390 [5/9] - in /incubator/gora/trunk: ./ bin/ conf/ docs/ gora-cassandra/ gora-cassandra/ivy/ gora-cassandra/src/examples/java/ gora-cassandra/src/main/java/org/apache/ gora-cassandra/src/main/java/org/apache/gora/ gora-cassandra/src/m...
Date Mon, 11 Oct 2010 15:40:29 GMT
Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/store/impl/DataStoreBase.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.store.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.gora.avro.PersistentDatumReader;
+import org.apache.gora.avro.PersistentDatumWriter;
+import org.apache.gora.persistency.BeanFactory;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.BeanFactoryImpl;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.util.AvroUtils;
+import org.apache.gora.util.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+
+/**
+ * A Base class for {@link DataStore}s.
+ */
+public abstract class DataStoreBase<K, T extends Persistent>
+implements DataStore<K, T> {
+
+  protected BeanFactory<K, T> beanFactory;
+
+  protected Class<K> keyClass;
+  protected Class<T> persistentClass;
+
+  /** The schema of the persistent class*/
+  protected Schema schema;
+
+  /** A map of field names to Field objects containing schema's fields*/
+  protected Map<String, Field> fieldMap;
+
+  protected Configuration conf;
+
+  protected boolean autoCreateSchema;
+
+  protected Properties properties;
+
+  protected PersistentDatumReader<T> datumReader;
+
+  protected PersistentDatumWriter<T> datumWriter;
+
+  public DataStoreBase() {
+  }
+
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass,
+      Properties properties) throws IOException {
+    setKeyClass(keyClass);
+    setPersistentClass(persistentClass);
+    if(this.beanFactory == null)
+      this.beanFactory = new BeanFactoryImpl<K, T>(keyClass, persistentClass);
+    schema = this.beanFactory.getCachedPersistent().getSchema();
+    fieldMap = AvroUtils.getFieldMap(schema);
+
+    autoCreateSchema = DataStoreFactory.getAutoCreateSchema(properties, this);
+    this.properties = properties;
+
+    datumReader = new PersistentDatumReader<T>(schema, false);
+    datumWriter = new PersistentDatumWriter<T>(schema, false);
+  }
+
+  @Override
+  public void setPersistentClass(Class<T> persistentClass) {
+    this.persistentClass = persistentClass;
+  }
+
+  @Override
+  public Class<T> getPersistentClass() {
+    return persistentClass;
+  }
+
+  @Override
+  public Class<K> getKeyClass() {
+    return keyClass;
+  }
+
+  @Override
+  public void setKeyClass(Class<K> keyClass) {
+    if(keyClass != null)
+      this.keyClass = keyClass;
+  }
+
+  @Override
+  public K newKey() throws IOException {
+    try {
+      return beanFactory.newKey();
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  @Override
+  public T newPersistent() throws IOException {
+    try {
+      return beanFactory.newPersistent();
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  @Override
+  public void setBeanFactory(BeanFactory<K, T> beanFactory) {
+    this.beanFactory = beanFactory;
+  }
+
+  @Override
+  public BeanFactory<K, T> getBeanFactory() {
+    return beanFactory;
+  }
+
+  @Override
+public T get(K key) throws IOException {
+    return get(key, null);
+  };
+
+  /**
+   * Checks whether the fields argument is null, and if so
+   * returns all the fields of the Persistent object, else returns the
+   * argument.
+   */
+  protected String[] getFieldsToQuery(String[] fields) {
+    if(fields != null) {
+      return fields;
+    }
+    return beanFactory.getCachedPersistent().getFields();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  protected Configuration getOrCreateConf() {
+    if(conf == null) {
+      conf = new Configuration();
+    }
+    return conf;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void readFields(DataInput in) throws IOException {
+    try {
+      Class<K> keyClass = (Class<K>) Class.forName(Text.readString(in));
+      Class<T> persistentClass = (Class<T>)Class.forName(Text.readString(in));
+      initialize(keyClass, persistentClass, DataStoreFactory.properties);
+
+    } catch (ClassNotFoundException ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, getKeyClass().getCanonicalName());
+    Text.writeString(out, getPersistentClass().getCanonicalName());
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if(obj instanceof DataStoreBase) {
+      @SuppressWarnings("rawtypes")
+	  DataStoreBase that = (DataStoreBase) obj;
+      EqualsBuilder builder = new EqualsBuilder();
+      builder.append(this.keyClass, that.keyClass);
+      builder.append(this.persistentClass, that.persistentClass);
+      return builder.isEquals();
+    }
+    return false;
+  }
+
+  @Override
+  /** Default implementation deletes and recreates the schema*/
+  public void truncateSchema() throws IOException {
+    deleteSchema();
+    createSchema();
+  }
+
+  /**
+   * Returns the name of the schema to use for the persistent class. If the mapping schema name is
+   * provided it is returned first, else the properties file is searched, and the default schema name is
+   * returned if found. Else, the class name, without the package, of the persistent class is returned.
+   * @param mappingSchemaName the name of the schema as read from the mapping file
+   * @param persistentClass persistent class
+   */
+  protected String getSchemaName(String mappingSchemaName, Class<?> persistentClass) {
+    String schemaName = DataStoreFactory.getDefaultSchemaName(properties, this);
+    if(schemaName != null) {
+      return schemaName;
+    }
+
+    if(mappingSchemaName != null) {
+      return mappingSchemaName;
+    }
+
+    return StringUtils.getClassname(persistentClass);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/store/impl/FileBackedDataStoreBase.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.store.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.FileSplitPartitionQuery;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.store.FileBackedDataStore;
+import org.apache.gora.util.OperationNotSupportedException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * Base implementations for {@link FileBackedDataStore} methods.
+ */
+public abstract class FileBackedDataStoreBase<K, T extends Persistent>
+  extends DataStoreBase<K, T> implements FileBackedDataStore<K, T> {
+
+  protected long inputSize; //input size in bytes
+
+  protected String inputPath;
+  protected String outputPath;
+
+  protected InputStream inputStream;
+  protected OutputStream outputStream;
+
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass,
+      Properties properties) throws IOException {
+    super.initialize(keyClass, persistentClass, properties);
+    if(properties != null) {
+      if(this.inputPath == null) {
+        this.inputPath = DataStoreFactory.getInputPath(properties, this);
+      }
+      if(this.outputPath == null) {
+        this.outputPath = DataStoreFactory.getOutputPath(properties, this);
+      }
+    }
+  }
+
+  @Override
+public void setInputPath(String inputPath) {
+    this.inputPath = inputPath;
+  }
+
+  @Override
+public void setOutputPath(String outputPath) {
+    this.outputPath = outputPath;
+  }
+
+  @Override
+public String getInputPath() {
+    return inputPath;
+  }
+
+  @Override
+public String getOutputPath() {
+    return outputPath;
+  }
+
+  @Override
+public void setInputStream(InputStream inputStream) {
+    this.inputStream = inputStream;
+  }
+
+  @Override
+public void setOutputStream(OutputStream outputStream) {
+    this.outputStream = outputStream;
+  }
+
+  @Override
+public InputStream getInputStream() {
+    return inputStream;
+  }
+
+  @Override
+  public OutputStream getOutputStream() {
+    return outputStream;
+  }
+
+  /** Opens an InputStream for the input Hadoop path */
+  protected InputStream createInputStream() throws IOException {
+    //TODO: if input path is a directory, use smt like MultiInputStream to
+    //read all the files recursively
+    Path path = new Path(inputPath);
+    FileSystem fs = path.getFileSystem(getConf());
+    inputSize = fs.getFileStatus(path).getLen();
+    return fs.open(path);
+  }
+
+  /** Opens an OutputStream for the output Hadoop path */
+  protected OutputStream createOutputStream() throws IOException {
+    Path path = new Path(outputPath);
+    FileSystem fs = path.getFileSystem(getConf());
+    return fs.create(path);
+  }
+
+  protected InputStream getOrCreateInputStream() throws IOException {
+    if(inputStream == null) {
+      inputStream = createInputStream();
+    }
+    return inputStream;
+  }
+
+  protected OutputStream getOrCreateOutputStream() throws IOException {
+    if(outputStream == null) {
+      outputStream = createOutputStream();
+    }
+    return outputStream;
+  }
+
+  @Override
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
+      throws IOException {
+    List<InputSplit> splits = GoraMapReduceUtils.getSplits(getConf(), inputPath);
+    List<PartitionQuery<K, T>> queries = new ArrayList<PartitionQuery<K,T>>(splits.size());
+
+    for(InputSplit split : splits) {
+      queries.add(new FileSplitPartitionQuery<K, T>(query, (FileSplit) split));
+    }
+
+    return queries;
+  }
+
+  @Override
+  public Result<K, T> execute(Query<K, T> query) throws IOException {
+    if(query instanceof FileSplitPartitionQuery) {
+        return executePartial((FileSplitPartitionQuery<K, T>) query);
+    } else {
+      return executeQuery(query);
+    }
+  }
+
+  /**
+   * Executes a normal Query reading the whole data. #execute() calls this function
+   * for non-PartitionQuery's.
+   */
+  protected abstract Result<K,T> executeQuery(Query<K,T> query)
+    throws IOException;
+
+  /**
+   * Executes a PartitialQuery, reading the data between start and end.
+   */
+  protected abstract Result<K,T> executePartial(FileSplitPartitionQuery<K,T> query)
+    throws IOException;
+
+  @Override
+  public void flush() throws IOException {
+    if(outputStream != null)
+      outputStream.flush();
+  }
+
+  @Override
+  public void createSchema() throws IOException {
+  }
+
+  @Override
+  public void deleteSchema() throws IOException {
+    throw new OperationNotSupportedException("delete schema is not supported for " +
+    		"file backed data stores");
+  }
+
+  @Override
+  public boolean schemaExists() throws IOException {
+    return true;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    org.apache.gora.util.IOUtils.writeNullFieldsInfo(out, inputPath, outputPath);
+    if(inputPath != null)
+      Text.writeString(out, inputPath);
+    if(outputPath != null)
+      Text.writeString(out, outputPath);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    boolean[] nullFields = org.apache.gora.util.IOUtils.readNullFieldsInfo(in);
+    if(!nullFields[0])
+      inputPath = Text.readString(in);
+    if(!nullFields[1])
+      outputPath = Text.readString(in);
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.closeStream(inputStream);
+    IOUtils.closeStream(outputStream);
+    inputStream = null;
+    outputStream = null;
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/AvroUtils.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/AvroUtils.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/AvroUtils.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/AvroUtils.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.util;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.gora.persistency.Persistent;
+
+/**
+ * An utility class for Avro related tasks 
+ */
+public class AvroUtils {
+
+  /**
+   * Returns a map of field name to Field for schema's fields.
+   */
+  public static Map<String, Field> getFieldMap(Schema schema) {
+    List<Field> fields = schema.getFields();
+    HashMap<String, Field> fieldMap = new HashMap<String, Field>(fields.size());
+    for(Field field: fields) {
+      fieldMap.put(field.name(), field);
+    }
+    return fieldMap;
+  }
+  
+  @SuppressWarnings("unchecked")
+  public static Object getEnumValue(Schema schema, String symbol) {
+    return Enum.valueOf(ReflectData.get().getClass(schema), symbol);
+  }
+  
+  public static Object getEnumValue(Schema schema, int enumOrdinal) {
+    String symbol = schema.getEnumSymbols().get(enumOrdinal);
+    return getEnumValue(schema, symbol);
+  }
+  
+  /**
+   * Returns the schema of the class
+   */
+  public static Schema getSchema(Class<? extends Persistent> clazz) 
+    throws SecurityException, NoSuchFieldException
+    , IllegalArgumentException, IllegalAccessException {
+    
+    java.lang.reflect.Field field = clazz.getDeclaredField("_SCHEMA");
+    return (Schema) field.get(null);
+  }
+  
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/ByteUtils.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/ByteUtils.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/ByteUtils.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/ByteUtils.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,721 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.util;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.avro.PersistentDatumReader;
+import org.apache.gora.avro.PersistentDatumWriter;
+import org.apache.hadoop.io.WritableUtils;
+
+//  This code is copied almost directly from HBase project's Bytes class.
+/**
+ * Utility class that handles byte arrays, conversions to/from other types.
+ *
+ */
+public class ByteUtils {
+
+  /**
+   * Size of boolean in bytes
+   */
+  public static final int SIZEOF_BOOLEAN = Byte.SIZE/Byte.SIZE;
+
+  /**
+   * Size of byte in bytes
+   */
+  public static final int SIZEOF_BYTE = SIZEOF_BOOLEAN;
+
+  /**
+   * Size of char in bytes
+   */
+  public static final int SIZEOF_CHAR = Character.SIZE/Byte.SIZE;
+
+  /**
+   * Size of double in bytes
+   */
+  public static final int SIZEOF_DOUBLE = Double.SIZE/Byte.SIZE;
+
+  /**
+   * Size of float in bytes
+   */
+  public static final int SIZEOF_FLOAT = Float.SIZE/Byte.SIZE;
+
+  /**
+   * Size of int in bytes
+   */
+  public static final int SIZEOF_INT = Integer.SIZE/Byte.SIZE;
+
+  /**
+   * Size of long in bytes
+   */
+  public static final int SIZEOF_LONG = Long.SIZE/Byte.SIZE;
+
+  /**
+   * Size of short in bytes
+   */
+  public static final int SIZEOF_SHORT = Short.SIZE/Byte.SIZE;
+
+  /**
+   * Put bytes at the specified byte array position.
+   * @param tgtBytes the byte array
+   * @param tgtOffset position in the array
+   * @param srcBytes byte to write out
+   * @param srcOffset
+   * @param srcLength
+   * @return incremented offset
+   */
+  public static int putBytes(byte[] tgtBytes, int tgtOffset, byte[] srcBytes,
+      int srcOffset, int srcLength) {
+    System.arraycopy(srcBytes, srcOffset, tgtBytes, tgtOffset, srcLength);
+    return tgtOffset + srcLength;
+  }
+
+  /**
+   * Write a single byte out to the specified byte array position.
+   * @param bytes the byte array
+   * @param offset position in the array
+   * @param b byte to write out
+   * @return incremented offset
+   */
+  public static int putByte(byte[] bytes, int offset, byte b) {
+    bytes[offset] = b;
+    return offset + 1;
+  }
+
+  /**
+   * Returns a new byte array, copied from the passed ByteBuffer.
+   * @param bb A ByteBuffer
+   * @return the byte array
+   */
+  public static byte[] toBytes(ByteBuffer bb) {
+    int length = bb.limit();
+    byte [] result = new byte[length];
+    System.arraycopy(bb.array(), bb.arrayOffset(), result, 0, length);
+    return result;
+  }
+
+  /**
+   * @param b Presumed UTF-8 encoded byte array.
+   * @return String made from <code>b</code>
+   */
+  public static String toString(final byte [] b) {
+    if (b == null) {
+      return null;
+    }
+    return toString(b, 0, b.length);
+  }
+
+  public static String toString(final byte [] b1,
+                                String sep,
+                                final byte [] b2) {
+    return toString(b1, 0, b1.length) + sep + toString(b2, 0, b2.length);
+  }
+
+  /**
+   * @param b Presumed UTF-8 encoded byte array.
+   * @param off
+   * @param len
+   * @return String made from <code>b</code>
+   */
+  public static String toString(final byte [] b, int off, int len) {
+    if(b == null) {
+      return null;
+    }
+    if(len == 0) {
+      return "";
+    }
+    String result = null;
+    try {
+      result = new String(b, off, len, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      e.printStackTrace();
+    }
+    return result;
+  }
+  /**
+   * Converts a string to a UTF-8 byte array.
+   * @param s
+   * @return the byte array
+   */
+  public static byte[] toBytes(String s) {
+    if (s == null) {
+      throw new IllegalArgumentException("string cannot be null");
+    }
+    byte [] result = null;
+    try {
+      result = s.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      e.printStackTrace();
+    }
+    return result;
+  }
+
+  /**
+   * Convert a boolean to a byte array.
+   * @param b
+   * @return <code>b</code> encoded in a byte array.
+   */
+  public static byte [] toBytes(final boolean b) {
+    byte [] bb = new byte[1];
+    bb[0] = b? (byte)-1: (byte)0;
+    return bb;
+  }
+
+  /**
+   * @param b
+   * @return True or false.
+   */
+  public static boolean toBoolean(final byte [] b) {
+    if (b == null || b.length > 1) {
+      throw new IllegalArgumentException("Array is wrong size");
+    }
+    return b[0] != (byte)0;
+  }
+
+  /**
+   * Convert a long value to a byte array
+   * @param val
+   * @return the byte array
+   */
+  public static byte[] toBytes(long val) {
+    byte [] b = new byte[8];
+    for(int i=7;i>0;i--) {
+      b[i] = (byte)(val);
+      val >>>= 8;
+    }
+    b[0] = (byte)(val);
+    return b;
+  }
+
+  /**
+   * Converts a byte array to a long value
+   * @param bytes
+   * @return the long value
+   */
+  public static long toLong(byte[] bytes) {
+    return toLong(bytes, 0);
+  }
+
+  /**
+   * Converts a byte array to a long value
+   * @param bytes
+   * @param offset
+   * @return the long value
+   */
+  public static long toLong(byte[] bytes, int offset) {
+    return toLong(bytes, offset, SIZEOF_LONG);
+  }
+
+  /**
+   * Converts a byte array to a long value
+   * @param bytes
+   * @param offset
+   * @param length
+   * @return the long value
+   */
+  public static long toLong(byte[] bytes, int offset, final int length) {
+    if (bytes == null || length != SIZEOF_LONG ||
+        (offset + length > bytes.length)) {
+      return -1L;
+    }
+    long l = 0;
+    for(int i = offset; i < (offset + length); i++) {
+      l <<= 8;
+      l ^= (long)bytes[i] & 0xFF;
+    }
+    return l;
+  }
+
+  /**
+   * Presumes float encoded as IEEE 754 floating-point "single format"
+   * @param bytes
+   * @return Float made from passed byte array.
+   */
+  public static float toFloat(byte [] bytes) {
+    return toFloat(bytes, 0);
+  }
+
+  /**
+   * Presumes float encoded as IEEE 754 floating-point "single format"
+   * @param bytes
+   * @param offset
+   * @return Float made from passed byte array.
+   */
+  public static float toFloat(byte [] bytes, int offset) {
+    int i = toInt(bytes, offset);
+    return Float.intBitsToFloat(i);
+  }
+  /**
+   * @param f
+   * @return the float represented as byte []
+   */
+  public static byte [] toBytes(final float f) {
+    // Encode it as int
+    int i = Float.floatToRawIntBits(f);
+    return toBytes(i);
+  }
+
+  /**
+   * @param bytes
+   * @return Return double made from passed bytes.
+   */
+  public static double toDouble(final byte [] bytes) {
+    return toDouble(bytes, 0);
+  }
+
+  /**
+   * @param bytes
+   * @param offset
+   * @return Return double made from passed bytes.
+   */
+  public static double toDouble(final byte [] bytes, final int offset) {
+    long l = toLong(bytes, offset);
+    return Double.longBitsToDouble(l);
+  }
+
+  /**
+   * @param d
+   * @return the double represented as byte []
+   */
+  public static byte [] toBytes(final double d) {
+    // Encode it as a long
+    long l = Double.doubleToRawLongBits(d);
+    return toBytes(l);
+  }
+
+  /**
+   * Convert an int value to a byte array
+   * @param val
+   * @return the byte array
+   */
+  public static byte[] toBytes(int val) {
+    byte [] b = new byte[4];
+    for(int i = 3; i > 0; i--) {
+      b[i] = (byte)(val);
+      val >>>= 8;
+    }
+    b[0] = (byte)(val);
+    return b;
+  }
+
+  /**
+   * Converts a byte array to an int value
+   * @param bytes
+   * @return the int value
+   */
+  public static int toInt(byte[] bytes) {
+    return toInt(bytes, 0);
+  }
+
+  /**
+   * Converts a byte array to an int value
+   * @param bytes
+   * @param offset
+   * @return the int value
+   */
+  public static int toInt(byte[] bytes, int offset) {
+    return toInt(bytes, offset, SIZEOF_INT);
+  }
+
+  /**
+   * Converts a byte array to an int value
+   * @param bytes
+   * @param offset
+   * @param length
+   * @return the int value
+   */
+  public static int toInt(byte[] bytes, int offset, final int length) {
+    if (bytes == null || length != SIZEOF_INT ||
+        (offset + length > bytes.length)) {
+      return -1;
+    }
+    int n = 0;
+    for(int i = offset; i < (offset + length); i++) {
+      n <<= 8;
+      n ^= bytes[i] & 0xFF;
+    }
+    return n;
+  }
+
+  /**
+   * Convert a short value to a byte array
+   * @param val
+   * @return the byte array
+   */
+  public static byte[] toBytes(short val) {
+    byte[] b = new byte[SIZEOF_SHORT];
+    b[1] = (byte)(val);
+    val >>= 8;
+    b[0] = (byte)(val);
+    return b;
+  }
+
+  /**
+   * Converts a byte array to a short value
+   * @param bytes
+   * @return the short value
+   */
+  public static short toShort(byte[] bytes) {
+    return toShort(bytes, 0);
+  }
+
+  /**
+   * Converts a byte array to a short value
+   * @param bytes
+   * @param offset
+   * @return the short value
+   */
+  public static short toShort(byte[] bytes, int offset) {
+    return toShort(bytes, offset, SIZEOF_SHORT);
+  }
+
+  /**
+   * Converts a byte array to a short value
+   * @param bytes
+   * @param offset
+   * @param length
+   * @return the short value
+   */
+  public static short toShort(byte[] bytes, int offset, final int length) {
+    if (bytes == null || length != SIZEOF_SHORT ||
+        (offset + length > bytes.length)) {
+      return -1;
+    }
+    short n = 0;
+    n ^= bytes[offset] & 0xFF;
+    n <<= 8;
+    n ^= bytes[offset+1] & 0xFF;
+    return n;
+  }
+
+  /**
+   * Convert a char value to a byte array
+   *
+   * @param val
+   * @return the byte array
+   */
+  public static byte[] toBytes(char val) {
+    byte[] b = new byte[SIZEOF_CHAR];
+    b[1] = (byte) (val);
+    val >>= 8;
+    b[0] = (byte) (val);
+    return b;
+  }
+
+  /**
+   * Converts a byte array to a char value
+   *
+   * @param bytes
+   * @return the char value
+   */
+  public static char toChar(byte[] bytes) {
+    return toChar(bytes, 0);
+  }
+
+
+  /**
+   * Converts a byte array to a char value
+   *
+   * @param bytes
+   * @param offset
+   * @return the char value
+   */
+  public static char toChar(byte[] bytes, int offset) {
+    return toChar(bytes, offset, SIZEOF_CHAR);
+  }
+
+  /**
+   * Converts a byte array to a char value
+   *
+   * @param bytes
+   * @param offset
+   * @param length
+   * @return the char value
+   */
+  public static char toChar(byte[] bytes, int offset, final int length) {
+    if (bytes == null || length != SIZEOF_CHAR ||
+      (offset + length > bytes.length)) {
+      return (char)-1;
+    }
+    char n = 0;
+    n ^= bytes[offset] & 0xFF;
+    n <<= 8;
+    n ^= bytes[offset + 1] & 0xFF;
+    return n;
+  }
+
+  /**
+   * Converts a byte array to a char array value
+   *
+   * @param bytes
+   * @return the char value
+   */
+  public static char[] toChars(byte[] bytes) {
+    return toChars(bytes, 0, bytes.length);
+  }
+
+  /**
+   * Converts a byte array to a char array value
+   *
+   * @param bytes
+   * @param offset
+   * @return the char value
+   */
+  public static char[] toChars(byte[] bytes, int offset) {
+    return toChars(bytes, offset, bytes.length-offset);
+  }
+
+  /**
+   * Converts a byte array to a char array value
+   *
+   * @param bytes
+   * @param offset
+   * @param length
+   * @return the char value
+   */
+  public static char[] toChars(byte[] bytes, int offset, final int length) {
+    int max = offset + length;
+    if (bytes == null || (max > bytes.length) || length %2 ==1) {
+      return null;
+    }
+
+    char[] chars = new char[length / 2];
+    for (int i = 0, j = offset; i < chars.length && j < max; i++, j += 2) {
+      char c = 0;
+      c ^= bytes[j] & 0xFF;
+      c <<= 8;
+      c ^= bytes[j + 1] & 0xFF;
+      chars[i] = c;
+    }
+    return chars;
+  }
+
+  /**
+   * @param vint Integer to make a vint of.
+   * @return Vint as bytes array.
+   */
+  public static byte [] vintToBytes(final long vint) {
+    long i = vint;
+    int size = WritableUtils.getVIntSize(i);
+    byte [] result = new byte[size];
+    int offset = 0;
+    if (i >= -112 && i <= 127) {
+      result[offset] = ((byte)i);
+      return result;
+    }
+
+    int len = -112;
+    if (i < 0) {
+      i ^= -1L; // take one's complement'
+      len = -120;
+    }
+
+    long tmp = i;
+    while (tmp != 0) {
+      tmp = tmp >> 8;
+    len--;
+    }
+
+    result[offset++] = (byte)len;
+
+    len = (len < -120) ? -(len + 120) : -(len + 112);
+
+    for (int idx = len; idx != 0; idx--) {
+      int shiftbits = (idx - 1) * 8;
+      long mask = 0xFFL << shiftbits;
+      result[offset++] = (byte)((i & mask) >> shiftbits);
+    }
+    return result;
+  }
+
+  /**
+   * @param buffer
+   * @return vint bytes as an integer.
+   */
+  public static long bytesToVlong(final byte [] buffer) {
+    int offset = 0;
+    byte firstByte = buffer[offset++];
+    int len = WritableUtils.decodeVIntSize(firstByte);
+    if (len == 1) {
+      return firstByte;
+    }
+    long i = 0;
+    for (int idx = 0; idx < len-1; idx++) {
+      byte b = buffer[offset++];
+      i = i << 8;
+      i = i | (b & 0xFF);
+    }
+    return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+  }
+
+  /**
+   * @param buffer
+   * @return vint bytes as an integer.
+   */
+  public static int bytesToVint(final byte [] buffer) {
+    int offset = 0;
+    byte firstByte = buffer[offset++];
+    int len = WritableUtils.decodeVIntSize(firstByte);
+    if (len == 1) {
+      return firstByte;
+    }
+    long i = 0;
+    for (int idx = 0; idx < len-1; idx++) {
+      byte b = buffer[offset++];
+      i = i << 8;
+      i = i | (b & 0xFF);
+    }
+    return (int)(WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+  }
+
+  /**
+   * Reads a zero-compressed encoded long from input stream and returns it.
+   * @param buffer Binary array
+   * @param offset Offset into array at which vint begins.
+   * @throws java.io.IOException
+   * @return deserialized long from stream.
+   */
+  public static long readVLong(final byte [] buffer, final int offset)
+  throws IOException {
+    byte firstByte = buffer[offset];
+    int len = WritableUtils.decodeVIntSize(firstByte);
+    if (len == 1) {
+      return firstByte;
+    }
+    long i = 0;
+    for (int idx = 0; idx < len-1; idx++) {
+      byte b = buffer[offset + 1 + idx];
+      i = i << 8;
+      i = i | (b & 0xFF);
+    }
+    return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+  }
+
+  /**
+   * @param left
+   * @param right
+   * @return 0 if equal, < 0 if left is less than right, etc.
+   */
+  public static int compareTo(final byte [] left, final byte [] right) {
+    return compareTo(left, 0, left.length, right, 0, right.length);
+  }
+
+  /**
+   * @param b1
+   * @param b2
+   * @param s1 Where to start comparing in the left buffer
+   * @param s2 Where to start comparing in the right buffer
+   * @param l1 How much to compare from the left buffer
+   * @param l2 How much to compare from the right buffer
+   * @return 0 if equal, < 0 if left is less than right, etc.
+   */
+  public static int compareTo(byte[] b1, int s1, int l1,
+      byte[] b2, int s2, int l2) {
+    // Bring WritableComparator code local
+    int end1 = s1 + l1;
+    int end2 = s2 + l2;
+    for (int i = s1, j = s2; i < end1 && j < end2; i++, j++) {
+      int a = (b1[i] & 0xff);
+      int b = (b2[j] & 0xff);
+      if (a != b) {
+        return a - b;
+      }
+    }
+    return l1 - l2;
+  }
+
+  /**
+   * @param left
+   * @param right
+   * @return True if equal
+   */
+  public static boolean equals(final byte [] left, final byte [] right) {
+    // Could use Arrays.equals?
+    return left == null && right == null? true:
+      (left == null || right == null || (left.length != right.length))? false:
+        compareTo(left, right) == 0;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static Object fromBytes( byte[] val, Schema schema
+      , PersistentDatumReader<?> datumReader, Object object)
+  throws IOException {
+    Type type = schema.getType();
+    switch (type) {
+    case ENUM:
+      String symbol = schema.getEnumSymbols().get(val[0]);
+      return Enum.valueOf(ReflectData.get().getClass(schema), symbol);
+    case STRING:  return new Utf8(toString(val));
+    case BYTES:   return ByteBuffer.wrap(val);
+    case INT:     return bytesToVint(val);
+    case LONG:    return bytesToVlong(val);
+    case FLOAT:   return toFloat(val);
+    case DOUBLE:  return toDouble(val);
+    case BOOLEAN: return val[0] != 0;
+    case RECORD:  //fall
+    case MAP:
+    case ARRAY:   return IOUtils.deserialize(val, datumReader, schema, object);
+    default: throw new RuntimeException("Unknown type: "+type);
+    }
+  }
+
+  public static byte[] toBytes(Object o, Schema schema
+      , PersistentDatumWriter<?> datumWriter)
+  throws IOException {
+    Type type = schema.getType();
+    switch (type) {
+    case STRING:  return toBytes(((Utf8)o).toString()); // TODO: maybe ((Utf8)o).getBytes(); ?
+    case BYTES:   return ((ByteBuffer)o).array();
+    case INT:     return vintToBytes((Integer)o);
+    case LONG:    return vintToBytes((Long)o);
+    case FLOAT:   return toBytes((Float)o);
+    case DOUBLE:  return toBytes((Double)o);
+    case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0};
+    case ENUM:    return new byte[] { (byte)((Enum<?>) o).ordinal() };
+    case RECORD:  //fall
+    case MAP:
+    case ARRAY:   return IOUtils.serialize(datumWriter, schema, o);
+    default: throw new RuntimeException("Unknown type: "+type);
+    }
+  }
+}
\ No newline at end of file

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/IOUtils.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/IOUtils.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/IOUtils.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/IOUtils.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.ipc.ByteBufferInputStream;
+import org.apache.avro.ipc.ByteBufferOutputStream;
+import org.apache.gora.avro.PersistentDatumReader;
+import org.apache.gora.avro.PersistentDatumWriter;
+import org.apache.gora.persistency.Persistent;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+
+/**
+ * An utility class for I/O related functionality.
+ */
+public class IOUtils {
+
+  private static SerializationFactory serializationFactory = null;
+  private static Configuration conf;
+
+  public static final int BUFFER_SIZE = 8192;
+
+  private static BinaryDecoder decoder;
+
+  private static Configuration getOrCreateConf(Configuration conf) {
+    if(conf == null) {
+      if(IOUtils.conf == null) {
+        IOUtils.conf = new Configuration();
+      }
+    }
+    return conf != null ? conf : IOUtils.conf;
+  }
+
+  public static Object readObject(DataInput in)
+    throws ClassNotFoundException, IOException {
+
+    if(in instanceof ObjectInput) {
+      return ((ObjectInput)in).readObject();
+    } else {
+      if(in instanceof InputStream) {
+        ObjectInput objIn = new ObjectInputStream((InputStream)in);
+        Object obj = objIn.readObject();
+        return obj;
+      }
+    }
+    throw new IOException("cannot write to DataOutput of instance:"
+        + in.getClass());
+  }
+
+  public static void writeObject(DataOutput out, Object obj)
+    throws IOException {
+    if(out instanceof ObjectOutput) {
+      ((ObjectOutput)out).writeObject(obj);
+    } else {
+      if(out instanceof OutputStream) {
+        ObjectOutput objOut = new ObjectOutputStream((OutputStream)out);
+        objOut.writeObject(obj);
+      }
+    }
+    throw new IOException("cannot write to DataOutput of instance:"
+        + out.getClass());
+  }
+
+  /** Serializes the object to the given dataoutput using
+   * available Hadoop serializations
+   * @throws IOException */
+  public static<T> void serialize(Configuration conf, DataOutput out
+      , T obj, Class<T> objClass) throws IOException {
+
+    if(serializationFactory == null) {
+      serializationFactory = new SerializationFactory(getOrCreateConf(conf));
+    }
+    Serializer<T> serializer = serializationFactory.getSerializer(objClass);
+
+    ByteBufferOutputStream os = new ByteBufferOutputStream();
+    try {
+      serializer.open(os);
+      serializer.serialize(obj);
+
+      int length = 0;
+      List<ByteBuffer> buffers = os.getBufferList();
+      for(ByteBuffer buffer : buffers) {
+        length += buffer.limit() - buffer.arrayOffset();
+      }
+
+      WritableUtils.writeVInt(out, length);
+      for(ByteBuffer buffer : buffers) {
+        byte[] arr = buffer.array();
+        out.write(arr, buffer.arrayOffset(), buffer.limit());
+      }
+
+    }finally {
+      if(serializer != null)
+        serializer.close();
+      if(os != null)
+        os.close();
+    }
+  }
+
+  /** Serializes the object to the given dataoutput using
+   * available Hadoop serializations
+   * @throws IOException */
+  @SuppressWarnings("unchecked")
+  public static<T> void serialize(Configuration conf, DataOutput out
+      , T obj) throws IOException {
+    Text.writeString(out, obj.getClass().getCanonicalName());
+    serialize(conf, out, obj, (Class<T>)obj.getClass());
+  }
+
+  /** Serializes the object to the given dataoutput using
+   * available Hadoop serializations*/
+  public static<T> byte[] serialize(Configuration conf, T obj) throws IOException {
+    DataOutputBuffer buffer = new DataOutputBuffer();
+    serialize(conf, buffer, obj);
+    return buffer.getData();
+  }
+
+
+  /**
+   * Serializes the field object using the datumWriter.
+   */
+  public static<T extends Persistent> void serialize(OutputStream os,
+      PersistentDatumWriter<T> datumWriter, Schema schema, Object object)
+      throws IOException {
+
+    BinaryEncoder encoder = new BinaryEncoder(os);
+    datumWriter.write(schema, object, encoder);
+    encoder.flush();
+  }
+
+  /**
+   * Serializes the field object using the datumWriter.
+   */
+  public static<T extends Persistent> byte[] serialize(PersistentDatumWriter<T> datumWriter
+      , Schema schema, Object object) throws IOException {
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    serialize(os, datumWriter, schema, object);
+    return os.toByteArray();
+  }
+
+  /** Deserializes the object in the given datainput using
+   * available Hadoop serializations.
+   * @throws IOException
+   * @throws ClassNotFoundException */
+  @SuppressWarnings("unchecked")
+  public static<T> T deserialize(Configuration conf, DataInput in
+      , T obj , String objClass) throws IOException, ClassNotFoundException {
+
+    Class<T> c = (Class<T>) Class.forName(objClass);
+
+    return deserialize(conf, in, obj, c);
+  }
+
+  /** Deserializes the object in the given datainput using
+   * available Hadoop serializations.
+   * @throws IOException */
+  public static<T> T deserialize(Configuration conf, DataInput in
+      , T obj , Class<T> objClass) throws IOException {
+    if(serializationFactory == null) {
+      serializationFactory = new SerializationFactory(getOrCreateConf(conf));
+    }
+    Deserializer<T> deserializer = serializationFactory.getDeserializer(
+        objClass);
+
+    int length = WritableUtils.readVInt(in);
+    byte[] arr = new byte[length];
+    in.readFully(arr);
+    List<ByteBuffer> list = new ArrayList<ByteBuffer>();
+    list.add(ByteBuffer.wrap(arr));
+    ByteBufferInputStream is = new ByteBufferInputStream(list);
+
+    try {
+      deserializer.open(is);
+      T newObj = deserializer.deserialize(obj);
+      return newObj;
+
+    }finally {
+      if(deserializer != null)
+        deserializer.close();
+      if(is != null)
+        is.close();
+    }
+  }
+
+  /** Deserializes the object in the given datainput using
+   * available Hadoop serializations.
+   * @throws IOException
+   * @throws ClassNotFoundException */
+  @SuppressWarnings("unchecked")
+  public static<T> T deserialize(Configuration conf, DataInput in
+      , T obj) throws IOException, ClassNotFoundException {
+    String clazz = Text.readString(in);
+    Class<T> c = (Class<T>)Class.forName(clazz);
+    return deserialize(conf, in, obj, c);
+  }
+
+  /** Deserializes the object in the given datainput using
+   * available Hadoop serializations.
+   * @throws IOException
+   * @throws ClassNotFoundException */
+  public static<T> T deserialize(Configuration conf, byte[] in
+      , T obj) throws IOException, ClassNotFoundException {
+    DataInputBuffer buffer = new DataInputBuffer();
+    buffer.reset(in, in.length);
+    return deserialize(conf, buffer, obj);
+  }
+
+  /**
+   * Deserializes the field object using the datumReader.
+   */
+  @SuppressWarnings("unchecked")
+  public static<K, T extends Persistent> K deserialize(InputStream is,
+      PersistentDatumReader<T> datumReader, Schema schema, K object)
+      throws IOException {
+    decoder = DecoderFactory.defaultFactory().createBinaryDecoder(is, decoder);
+    return (K)datumReader.read(object, schema, decoder);
+  }
+
+  /**
+   * Deserializes the field object using the datumReader.
+   */
+  @SuppressWarnings("unchecked")
+  public static<K, T extends Persistent> K deserialize(byte[] bytes,
+      PersistentDatumReader<T> datumReader, Schema schema, K object)
+      throws IOException {
+    decoder = DecoderFactory.defaultFactory().createBinaryDecoder(bytes, decoder);
+    return (K)datumReader.read(object, schema, decoder);
+  }
+
+
+  /**
+   * Serializes the field object using the datumWriter.
+   */
+  public static<T extends Persistent> byte[] deserialize(PersistentDatumWriter<T> datumWriter
+      , Schema schema, Object object) throws IOException {
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    serialize(os, datumWriter, schema, object);
+    return os.toByteArray();
+  }
+
+  /**
+   * Writes a byte[] to the output, representing whether each given field is null
+   * or not. A Vint and ceil( fields.length / 8 ) bytes are written to the output.
+   * @param out the output to write to
+   * @param fields the fields to check for null
+   * @see #readNullFieldsInfo(DataInput)
+   */
+  public static void writeNullFieldsInfo(DataOutput out, Object ... fields)
+    throws IOException {
+
+    boolean[] isNull = new boolean[fields.length];
+
+    for(int i=0; i<fields.length; i++) {
+      isNull[i] = (fields[i] == null);
+    }
+
+    writeBoolArray(out, isNull);
+  }
+
+  /**
+   * Reads the data written by {@link #writeNullFieldsInfo(DataOutput, Object...)}
+   * and returns a boolean array representing whether each field is null or not.
+   * @param in the input to read from
+   * @return a boolean[] representing whether each field is null or not.
+   */
+  public static boolean[] readNullFieldsInfo(DataInput in) throws IOException {
+    return readBoolArray(in);
+  }
+
+  /**
+   * Writes a boolean[] to the output.
+   */
+  public static void writeBoolArray(DataOutput out, boolean[] boolArray)
+    throws IOException {
+
+    WritableUtils.writeVInt(out, boolArray.length);
+
+    byte b = 0;
+    int i = 0;
+    for(i=0; i<boolArray.length; i++) {
+      if(i % 8 == 0 && i != 0) {
+        out.writeByte(b);
+        b = 0;
+      }
+      b >>= 1;
+      if(boolArray[i])
+        b |= 0x80;
+      else
+        b &= 0x7F;
+    }
+    if(i % 8 != 0) {
+      for(int j=0; j < 8 - (i % 8); j++) { //shift for the remaining byte
+        b >>=1;
+        b &= 0x7F;
+      }
+    }
+
+    out.writeByte(b);
+  }
+
+  /**
+   * Reads a boolean[] from input
+   * @throws IOException
+   */
+  public static boolean[] readBoolArray(DataInput in) throws IOException {
+    int length = WritableUtils.readVInt(in);
+    boolean[] arr = new boolean[length];
+
+    byte b = 0;
+    for(int i=0; i < length; i++) {
+      if(i % 8 == 0) {
+        b = in.readByte();
+      }
+      arr[i] = (b & 0x01) > 0;
+      b >>= 1;
+    }
+    return arr;
+  }
+
+
+  /**
+   * Writes a boolean[] to the output.
+   */
+  public static void writeBoolArray(Encoder out, boolean[] boolArray)
+    throws IOException {
+
+    out.writeInt(boolArray.length);
+
+    int byteArrLength = (int)Math.ceil(boolArray.length / 8.0);
+
+    byte b = 0;
+    byte[] arr = new byte[byteArrLength];
+    int i = 0;
+    int arrIndex = 0;
+    for(i=0; i<boolArray.length; i++) {
+      if(i % 8 == 0 && i != 0) {
+        arr[arrIndex++] = b;
+        b = 0;
+      }
+      b >>= 1;
+      if(boolArray[i])
+        b |= 0x80;
+      else
+        b &= 0x7F;
+    }
+    if(i % 8 != 0) {
+      for(int j=0; j < 8 - (i % 8); j++) { //shift for the remaining byte
+        b >>=1;
+        b &= 0x7F;
+      }
+    }
+
+    arr[arrIndex++] = b;
+    out.writeFixed(arr);
+  }
+
+  /**
+   * Reads a boolean[] from input
+   * @throws IOException
+   */
+  public static boolean[] readBoolArray(Decoder in) throws IOException {
+
+    int length = in.readInt();
+    boolean[] boolArr = new boolean[length];
+
+    int byteArrLength = (int)Math.ceil(length / 8.0);
+    byte[] byteArr = new byte[byteArrLength];
+    in.readFixed(byteArr);
+
+    int arrIndex = 0;
+    byte b = 0;
+    for(int i=0; i < length; i++) {
+      if(i % 8 == 0) {
+        b = byteArr[arrIndex++];
+      }
+      boolArr[i] = (b & 0x01) > 0;
+      b >>= 1;
+    }
+    return boolArr;
+  }
+
+  /**
+   * Writes the String array to the given DataOutput.
+   * @param out the data output to write to
+   * @param arr the array to write
+   * @see #readStringArray(DataInput)
+   */
+  public static void writeStringArray(DataOutput out, String[] arr)
+    throws IOException {
+    WritableUtils.writeVInt(out, arr.length);
+    for(String str : arr) {
+      Text.writeString(out, str);
+    }
+  }
+
+  /**
+   * Reads and returns a String array that is written by
+   * {@link #writeStringArray(DataOutput, String[])}.
+   * @param in the data input to read from
+   * @return read String[]
+   */
+  public static String[] readStringArray(DataInput in) throws IOException {
+    int len = WritableUtils.readVInt(in);
+    String[] arr = new String[len];
+    for(int i=0; i<len; i++) {
+      arr[i] = Text.readString(in);
+    }
+    return arr;
+  }
+
+  /**
+   * Stores the given object in the configuration under the given dataKey
+   * @param obj the object to store
+   * @param conf the configuration to store the object into
+   * @param dataKey the key to store the data
+   */
+  public static<T> void storeToConf(T obj, Configuration conf, String dataKey)
+    throws IOException {
+    String classKey = dataKey + "._class";
+    conf.set(classKey, obj.getClass().getCanonicalName());
+    DefaultStringifier.store(conf, obj, dataKey);
+  }
+
+  /**
+   * Loads the object stored by {@link #storeToConf(Object, Configuration, String)}
+   * method from the configuration under the given dataKey.
+   * @param conf the configuration to read from
+   * @param dataKey the key to get the data from
+   * @return the store object
+   */
+  @SuppressWarnings("unchecked")
+  public static<T> T loadFromConf(Configuration conf, String dataKey)
+    throws IOException {
+    String classKey = dataKey + "._class";
+    String className = conf.get(classKey);
+    try {
+      T obj = (T) DefaultStringifier.load(conf, dataKey, Class.forName(className));
+      return obj;
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  /**
+   * Copies the contents of the buffers into a single byte[]
+   */
+  //TODO: not tested
+  public static byte[] getAsBytes(List<ByteBuffer> buffers) {
+    //find total size
+    int size = 0;
+    for(ByteBuffer buffer : buffers) {
+      size += buffer.remaining();
+    }
+
+    byte[] arr = new byte[size];
+
+    int offset = 0;
+    for(ByteBuffer buffer : buffers) {
+      int len = buffer.remaining();
+      buffer.get(arr, offset, len);
+      offset += len;
+    }
+
+    return arr;
+  }
+
+  /**
+   * Reads until the end of the input stream, and returns the contents as a byte[]
+   */
+  public static byte[] readFully(InputStream in) throws IOException {
+    List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(4);
+    while(true) {
+      ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+      int count = in.read(buffer.array(), 0, BUFFER_SIZE);
+      if(count > 0) {
+        buffer.limit(count);
+        buffers.add(buffer);
+      }
+      if(count < BUFFER_SIZE) break;
+    }
+
+    return getAsBytes(buffers);
+  }
+
+}
\ No newline at end of file

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/NodeWalker.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/NodeWalker.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/NodeWalker.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/NodeWalker.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.util;
+
+import java.util.Stack;
+
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+/* Copied from Apache Nutch */
+
+/**
+ * <p>A utility class that allows the walking of any DOM tree using a stack 
+ * instead of recursion.  As the node tree is walked the next node is popped
+ * off of the stack and all of its children are automatically added to the 
+ * stack to be called in tree order.</p>
+ * 
+ * <p>Currently this class is not thread safe.  It is assumed that only one
+ * thread will be accessing the <code>NodeWalker</code> at any given time.</p>
+ */
+public class NodeWalker {
+
+  // the root node the the stack holding the nodes
+  private Node currentNode;
+  private NodeList currentChildren;
+  private Stack<Node> nodes;
+  
+  /**
+   * Starts the <code>Node</code> tree from the root node.
+   * 
+   * @param rootNode
+   */
+  public NodeWalker(Node rootNode) {
+
+    nodes = new Stack<Node>();
+    nodes.add(rootNode);
+  }
+  
+  /**
+   * <p>Returns the next <code>Node</code> on the stack and pushes all of its
+   * children onto the stack, allowing us to walk the node tree without the
+   * use of recursion.  If there are no more nodes on the stack then null is
+   * returned.</p>
+   * 
+   * @return Node The next <code>Node</code> on the stack or null if there
+   * isn't a next node.
+   */
+  public Node nextNode() {
+    
+    // if no next node return null
+    if (!hasNext()) {
+      return null;
+    }
+    
+    // pop the next node off of the stack and push all of its children onto
+    // the stack
+    currentNode = nodes.pop();
+    currentChildren = currentNode.getChildNodes();
+    int childLen = (currentChildren != null) ? currentChildren.getLength() : 0;
+    
+    // put the children node on the stack in first to last order
+    for (int i = childLen - 1; i >= 0; i--) {
+      nodes.add(currentChildren.item(i));
+    }
+    
+    return currentNode;
+  }
+  
+  /**
+   * <p>Skips over and removes from the node stack the children of the last
+   * node.  When getting a next node from the walker, that node's children 
+   * are automatically added to the stack.  You can call this method to remove
+   * those children from the stack.</p>
+   * 
+   * <p>This is useful when you don't want to process deeper into the 
+   * current path of the node tree but you want to continue processing sibling
+   * nodes.</p>
+   *
+   */
+  public void skipChildren() {
+    
+    int childLen = (currentChildren != null) ? currentChildren.getLength() : 0;
+    
+    for (int i = 0 ; i < childLen ; i++) {
+      Node child = nodes.peek();
+      if (child.equals(currentChildren.item(i))) {
+        nodes.pop();
+      }
+    }
+  }
+  
+  /**
+   * Returns true if there are more nodes on the current stack.
+   * @return
+   */
+  public boolean hasNext() {
+    return (nodes.size() > 0);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/Null.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/Null.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/Null.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/Null.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.util;
+
+/**
+ * Placeholder for Null type arguments
+ */
+public class Null {
+
+  private static final Null INSTANCE = new Null();
+  
+  public Null() {
+  }
+  
+  public static Null get() {
+    return INSTANCE;
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/OperationNotSupportedException.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/OperationNotSupportedException.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/OperationNotSupportedException.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/OperationNotSupportedException.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.util;
+
+/**
+ * Operation is not supported or implemented.
+ */
+public class OperationNotSupportedException extends RuntimeException {
+
+  private static final long serialVersionUID = 2929205790920793629L;
+
+  public OperationNotSupportedException() {
+    super();
+  }
+
+  public OperationNotSupportedException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public OperationNotSupportedException(String message) {
+    super(message);
+  }
+
+  public OperationNotSupportedException(Throwable cause) {
+    super(cause);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/ReflectionUtils.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/ReflectionUtils.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/ReflectionUtils.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/ReflectionUtils.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.util;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Utility methods related to reflection
+ */
+public class ReflectionUtils {
+
+  public static Class<?>[] EMPTY_CLASS_ARRAY = new Class<?>[0];
+  public static Object[] EMPTY_OBJECT_ARRAY = new Object[0];
+  
+  /**
+   * Returns the empty argument constructor of the class.
+   */
+  public static<T> Constructor<T> getConstructor(Class<T> clazz) 
+    throws SecurityException, NoSuchMethodException {
+    if(clazz == null) {
+      throw new IllegalArgumentException("class cannot be null");
+    }
+    Constructor<T> cons = clazz.getConstructor(EMPTY_CLASS_ARRAY);
+    cons.setAccessible(true);
+    return cons;
+  }
+  
+  /**
+   * Constructs a new instance of the class using the no-arg constructor.
+   * @param clazz the class of the object
+   * @return a new instance of the object
+   */
+  public static <T> T newInstance(Class<T> clazz) throws InstantiationException
+  , IllegalAccessException, SecurityException, NoSuchMethodException
+  , IllegalArgumentException, InvocationTargetException {
+    
+    Constructor<T> cons = getConstructor(clazz);
+    
+    return cons.newInstance(EMPTY_OBJECT_ARRAY);
+  }
+  
+  /**
+   * Constructs a new instance of the class using the no-arg constructor.
+   * @param classStr the class name of the object
+   * @return a new instance of the object
+   */
+  public static Object newInstance(String classStr) throws InstantiationException
+    , IllegalAccessException, ClassNotFoundException, SecurityException
+    , IllegalArgumentException, NoSuchMethodException, InvocationTargetException {
+    if(classStr == null) {
+      throw new IllegalArgumentException("class cannot be null");
+    }
+    Class<?> clazz = Class.forName(classStr);
+    return newInstance(clazz);
+  }
+  
+  /**
+   * Returns the value of a named static field
+   */
+  public static Object getStaticField(Class<?> clazz, String fieldName) 
+  throws IllegalArgumentException, SecurityException,
+  IllegalAccessException, NoSuchFieldException {
+    
+    return clazz.getField(fieldName).get(null);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/StringUtils.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/StringUtils.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/StringUtils.java (added)
+++ incubator/gora/trunk/gora-core/src/main/java/org/apache/gora/util/StringUtils.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.util;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * An utility class for String related functionality.
+ */
+public class StringUtils {
+
+  /**
+   * Joins the two given arrays, removing dup elements.
+   */
+  public static String[] joinStringArrays(String[] arr1, String... arr2) {
+    HashSet<String> set = new HashSet<String>();
+    for(String str : arr1) set.add(str);
+    for(String str : arr2) set.add(str);
+
+    return set.toArray(new String[set.size()]);
+  }
+
+  public static String join(List<String> strs) {
+    return join(new StringBuilder(), strs).toString();
+  }
+
+  public static String join(String[] strs) {
+    return join(new StringBuilder(), strs).toString();
+  }
+
+  public static StringBuilder join(StringBuilder builder, Collection<String> strs) {
+    int i = 0;
+    for (String s : strs) {
+      if(i != 0) builder.append(',');
+      builder.append(s);
+      i++;
+    }
+    return builder;
+  }
+
+  public static StringBuilder join(StringBuilder builder, String[] strs) {
+    for (int i = 0; i < strs.length; i++) {
+      if(i != 0) builder.append(",");
+      builder.append(strs[i]);
+    }
+    return builder;
+  }
+
+  /** helper for string null and empty checking*/
+  public static boolean is(String str) {
+    return str != null && str.length() > 0;
+  }
+
+  //below is taken from:http://jvalentino.blogspot.com/2007/02/shortcut-to-calculating-power-set-using.html
+  /**
+   * Returns the power set from the given set by using a binary counter
+   * Example: S = {a,b,c}
+   * P(S) = {[], [c], [b], [b, c], [a], [a, c], [a, b], [a, b, c]}
+   * @param set String[]
+   * @return LinkedHashSet
+   */
+  public static LinkedHashSet<Set<String>> powerset(String[] set) {
+
+    //create the empty power set
+    LinkedHashSet<Set<String>> power = new LinkedHashSet<Set<String>>();
+
+    //get the number of elements in the set
+    int elements = set.length;
+
+    //the number of members of a power set is 2^n
+    int powerElements = (int) Math.pow(2,elements);
+
+    //run a binary counter for the number of power elements
+    for (int i = 0; i < powerElements; i++) {
+
+      //convert the binary number to a string containing n digits
+      String binary = intToBinary(i, elements);
+
+      //create a new set
+      LinkedHashSet<String> innerSet = new LinkedHashSet<String>();
+
+      //convert each digit in the current binary number to the corresponding element
+      //in the given set
+      for (int j = 0; j < binary.length(); j++) {
+        if (binary.charAt(j) == '1')
+          innerSet.add(set[j]);
+      }
+
+      //add the new set to the power set
+      power.add(innerSet);
+
+    }
+
+    return power;
+  }
+
+  /**
+   * Converts the given integer to a String representing a binary number
+   * with the specified number of digits
+   * For example when using 4 digits the binary 1 is 0001
+   * @param binary int
+   * @param digits int
+   * @return String
+   */
+  private static String intToBinary(int binary, int digits) {
+    String temp = Integer.toBinaryString(binary);
+    int foundDigits = temp.length();
+    String returner = temp;
+    for (int i = foundDigits; i < digits; i++) {
+      returner = "0" + returner;
+    }
+    return returner;
+  }
+
+  public static int parseInt(String str, int defaultValue) {
+    if(str == null) {
+      return defaultValue;
+    }
+    return Integer.parseInt(str);
+  }
+
+  /**
+   * Returns the name of the class without the package name.
+   */
+  public static String getClassname(Class<?> clazz) {
+    return getClassname(clazz.getName());
+  }
+
+  /**
+   * Returns the name of the class without the package name.
+   */
+  public static String getClassname(String classname) {
+    String[] parts = classname.split("\\.");
+    return parts[parts.length-1];
+  }
+
+}

Modified: incubator/gora/trunk/gora-core/src/test/conf/.gitignore
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/test/conf/.gitignore?rev=1021390&r1=1021389&r2=1021390&view=diff
==============================================================================
--- incubator/gora/trunk/gora-core/src/test/conf/.gitignore (original)
+++ incubator/gora/trunk/gora-core/src/test/conf/.gitignore Mon Oct 11 15:40:22 2010
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+

Modified: incubator/gora/trunk/gora-core/src/test/conf/gora.properties
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/test/conf/gora.properties?rev=1021390&r1=1021389&r2=1021390&view=diff
==============================================================================
--- incubator/gora/trunk/gora-core/src/test/conf/gora.properties (original)
+++ incubator/gora/trunk/gora-core/src/test/conf/gora.properties Mon Oct 11 15:40:22 2010
@@ -1,4 +1,19 @@
-gora.datastore.default=org.gora.mock.store.MockDataStore
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+gora.datastore.default=org.apache.gora.mock.store.MockDataStore
 gora.datastore.autocreateschema=true
 gora.avrostore.output.path=file:///tmp/gora.avrostore.test.output
 

Added: incubator/gora/trunk/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java (added)
+++ incubator/gora/trunk/gora-core/src/test/java/org/apache/gora/GoraTestDriver.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+
+/**
+ * GoraTestDriver is a helper class for third party tests.
+ * GoraTestDriver can be used to initialize and tear down mini clusters
+ * (such as mini HBase cluster, local Hsqldb instance, etc) so that
+ * these details are abstracted away.
+ */
+public class GoraTestDriver {
+
+  protected static final Log log = LogFactory.getLog(GoraTestDriver.class);
+
+  protected Class<? extends DataStore> dataStoreClass;
+
+  @SuppressWarnings("rawtypes")
+  protected HashSet<DataStore> dataStores;
+
+  @SuppressWarnings("rawtypes")
+  protected GoraTestDriver(Class<? extends DataStore> dataStoreClass) {
+    this.dataStoreClass = dataStoreClass;
+    this.dataStores = new HashSet<DataStore>();
+  }
+
+  /** Should be called once before the tests are started, probably in the
+   * method annotated with org.junit.BeforeClass
+   */
+  public void setUpClass() throws Exception {
+    setProperties(DataStoreFactory.properties);
+  }
+
+  /** Should be called once after the tests have finished, probably in the
+   * method annotated with org.junit.AfterClass
+   */
+  public void tearDownClass() throws Exception {
+
+  }
+
+  /** Should be called once before each test, probably in the
+   * method annotated with org.junit.Before
+   */
+  public void setUp() throws Exception {
+    log.info("setting up test");
+    try {
+      for(DataStore store : dataStores) {
+        store.truncateSchema();
+      }
+    }catch (IOException ignore) {
+    }
+  }
+    
+  /** Should be called once after each test, probably in the
+   * method annotated with org.junit.After
+   */
+  @SuppressWarnings("rawtypes")
+  public void tearDown() throws Exception {
+    log.info("tearing down test");
+    //delete everything
+    try {
+      for(DataStore store : dataStores) {
+        //store.flush();
+        store.deleteSchema();
+        store.close();
+      }
+    }catch (IOException ignore) {
+    }
+    dataStores.clear();
+  }
+
+  protected void setProperties(Properties properties) {
+  }
+
+  @SuppressWarnings("unchecked")
+  public<K, T extends Persistent> DataStore<K,T>
+    createDataStore(Class<K> keyClass, Class<T> persistentClass) {
+    setProperties(DataStoreFactory.properties);
+    DataStore<K,T> dataStore = DataStoreFactory.createDataStore(
+        (Class<? extends DataStore<K,T>>)dataStoreClass, keyClass, persistentClass);
+    dataStores.add(dataStore);
+
+    return dataStore;
+  }
+  
+  public Class<?> getDataStoreClass() {
+    return dataStoreClass;
+  }
+}

Added: incubator/gora/trunk/gora-core/src/test/java/org/apache/gora/avro/TestPersistentDatumReader.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/test/java/org/apache/gora/avro/TestPersistentDatumReader.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/test/java/org/apache/gora/avro/TestPersistentDatumReader.java (added)
+++ incubator/gora/trunk/gora-core/src/test/java/org/apache/gora/avro/TestPersistentDatumReader.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.avro;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.avro.PersistentDatumReader;
+import org.apache.gora.examples.WebPageDataCreator;
+import org.apache.gora.examples.generated.Employee;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.memory.store.MemStore;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.store.DataStoreTestUtil;
+import org.junit.Test;
+
+/**
+ * Test case for {@link PersistentDatumReader}.
+ */
+public class TestPersistentDatumReader {
+
+  private PersistentDatumReader<WebPage> webPageDatumReader 
+    = new PersistentDatumReader<WebPage>();
+  
+  private void testClone(Persistent persistent) throws IOException {
+    Persistent cloned = webPageDatumReader.clone(persistent, persistent.getSchema());
+    assertClone(persistent, cloned);
+  }
+  
+  private void assertClone(Persistent persistent, Persistent cloned) {
+    Assert.assertNotNull("cloned object is null", cloned);
+    Assert.assertEquals("cloned object is not equal to original object", persistent, cloned);
+  }
+  
+  @Test
+  public void testCloneEmployee() throws Exception {
+    @SuppressWarnings("unchecked")
+    MemStore<String, Employee> store = DataStoreFactory.getDataStore(
+        MemStore.class, String.class, Employee.class);
+
+    Employee employee = DataStoreTestUtil.createEmployee(store);
+    
+    testClone(employee);
+  }
+  
+  @Test
+  public void testCloneEmployeeOneField() throws Exception {
+    Employee employee = new Employee();
+    employee.setSsn(new Utf8("11111"));
+
+    testClone(employee);
+  }
+
+  @Test
+  public void testCloneEmployeeTwoFields() throws Exception {
+    Employee employee = new Employee();
+    employee.setSsn(new Utf8("11111"));
+    employee.setSalary(100);
+
+    testClone(employee);
+  }
+
+  @Test
+  public void testCloneWebPage() throws Exception {
+    @SuppressWarnings("unchecked")
+    DataStore<String, WebPage> store = DataStoreFactory.createDataStore(
+        MemStore.class, String.class, WebPage.class);
+    WebPageDataCreator.createWebPageData(store);
+
+    Query<String, WebPage> query = store.newQuery();
+    Result<String, WebPage> result = query.execute();
+    
+    int tested = 0;
+    while(result.next()) {
+      WebPage page = result.get();
+      testClone(page);
+      tested++;
+    }
+    Assert.assertEquals(WebPageDataCreator.URLS.length, tested);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/test/java/org/apache/gora/avro/mapreduce/TestDataFileAvroStoreMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/test/java/org/apache/gora/avro/mapreduce/TestDataFileAvroStoreMapReduce.java?rev=1021390&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/test/java/org/apache/gora/avro/mapreduce/TestDataFileAvroStoreMapReduce.java (added)
+++ incubator/gora/trunk/gora-core/src/test/java/org/apache/gora/avro/mapreduce/TestDataFileAvroStoreMapReduce.java Mon Oct 11 15:40:22 2010
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.avro.mapreduce;
+
+import static org.apache.gora.avro.store.TestAvroStore.WEBPAGE_OUTPUT;
+
+import java.io.IOException;
+
+import org.apache.gora.avro.store.DataFileAvroStore;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.mapreduce.DataStoreMapReduceTestBase;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+
+/**
+ * Mapreduce tests for {@link DataFileAvroStore}.
+ */
+public class TestDataFileAvroStoreMapReduce extends DataStoreMapReduceTestBase {
+
+  public TestDataFileAvroStoreMapReduce() throws IOException {
+    super();
+  }
+
+  @Override
+  protected DataStore<String, WebPage> createWebPageDataStore() 
+    throws IOException {
+    DataFileAvroStore<String,WebPage> webPageStore = new DataFileAvroStore<String, WebPage>();
+    webPageStore.initialize(String.class, WebPage.class, DataStoreFactory.properties);
+    webPageStore.setOutputPath(WEBPAGE_OUTPUT);
+    webPageStore.setInputPath(WEBPAGE_OUTPUT);
+    
+    return webPageStore;
+  }
+
+}



Mime
View raw message