gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject svn commit: r1006024 [6/8] - in /incubator/gora: branches/ tags/ trunk/ trunk/bin/ trunk/conf/ trunk/docs/ trunk/gora-cassandra/ trunk/gora-cassandra/ivy/ trunk/gora-cassandra/lib-ext/ trunk/gora-cassandra/src/ trunk/gora-cassandra/src/examples/ trunk/...
Date Fri, 08 Oct 2010 21:17:17 GMT
Added: incubator/gora/trunk/gora-core/src/test/java/org/gora/persistency/impl/TestStateManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/test/java/org/gora/persistency/impl/TestStateManagerImpl.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/test/java/org/gora/persistency/impl/TestStateManagerImpl.java (added)
+++ incubator/gora/trunk/gora-core/src/test/java/org/gora/persistency/impl/TestStateManagerImpl.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,97 @@
+
+package org.gora.persistency.impl;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.avro.util.Utf8;
+import org.gora.examples.generated.Employee;
+import org.gora.mock.persistency.MockPersistent;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test case for {@link StateManagerImpl}
+ */
+public class TestStateManagerImpl {
+
+  private StateManagerImpl stateManager;
+  private MockPersistent persistent;
+  
+  @Before
+  public void setUp() {
+    this.stateManager = new StateManagerImpl();
+    this.persistent = new MockPersistent(stateManager);
+  }
+  
+  @Test
+  public void testDirty() {
+    Assert.assertFalse(stateManager.isDirty(persistent));
+    stateManager.setDirty(persistent);
+    Assert.assertTrue(stateManager.isDirty(persistent));
+  }
+  
+  @Test
+  public void testDirty2() {
+    Assert.assertFalse(stateManager.isDirty(persistent, 0));
+    Assert.assertFalse(stateManager.isDirty(persistent, 1));
+    stateManager.setDirty(persistent, 0);
+    Assert.assertTrue(stateManager.isDirty(persistent, 0));
+    Assert.assertFalse(stateManager.isDirty(persistent, 1));
+  }
+  
+  @Test
+  public void testClearDirty() {
+    Assert.assertFalse(stateManager.isDirty(persistent));
+    stateManager.setDirty(persistent, 0);
+    stateManager.clearDirty(persistent);
+    Assert.assertFalse(this.stateManager.isDirty(persistent));
+  }
+  
+  @Test
+  public void testReadable() throws IOException {
+    Assert.assertFalse(stateManager.isReadable(persistent, 0));
+    Assert.assertFalse(stateManager.isReadable(persistent, 1));
+    stateManager.setReadable(persistent, 0);
+    Assert.assertTrue(stateManager.isReadable(persistent, 0));
+    Assert.assertFalse(stateManager.isReadable(persistent, 1));
+  }
+
+  @Test
+  public void testReadable2() {
+    stateManager = new StateManagerImpl();
+    Employee employee = new Employee(stateManager);
+    Assert.assertFalse(stateManager.isReadable(employee, 0));
+    Assert.assertFalse(stateManager.isReadable(employee, 1));
+    employee.setName(new Utf8("foo"));
+    Assert.assertTrue(stateManager.isReadable(employee, 0));
+    Assert.assertFalse(stateManager.isReadable(employee, 1));
+  }
+  
+  @Test
+  public void testClearReadable() {
+    stateManager.setReadable(persistent, 0);
+    stateManager.clearReadable(persistent);
+    Assert.assertFalse(stateManager.isReadable(persistent, 0));
+  }
+  
+  @Test
+  public void testIsNew() {
+    //newly created objects should be new
+    Assert.assertTrue(persistent.isNew());
+  }
+  
+  @Test
+  public void testNew() {
+    stateManager.setNew(persistent);
+    Assert.assertTrue(persistent.isNew());
+  }
+  
+  @Test
+  public void testClearNew() {
+    stateManager.clearNew(persistent);
+    Assert.assertFalse(persistent.isNew());
+  }
+  
+}

Added: incubator/gora/trunk/gora-core/src/test/java/org/gora/query/impl/TestPartitionQueryImpl.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/test/java/org/gora/query/impl/TestPartitionQueryImpl.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/test/java/org/gora/query/impl/TestPartitionQueryImpl.java (added)
+++ incubator/gora/trunk/gora-core/src/test/java/org/gora/query/impl/TestPartitionQueryImpl.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,30 @@
+
+package org.gora.query.impl;
+
+import org.apache.hadoop.io.TestWritable;
+import org.gora.mock.persistency.MockPersistent;
+import org.gora.mock.query.MockQuery;
+import org.gora.mock.store.MockDataStore;
+import org.junit.Test;
+
+/**
+ * Test case for {@link PartitionQueryImpl}
+ */
+public class TestPartitionQueryImpl {
+
+  private MockDataStore dataStore = MockDataStore.get();
+  
+  @Test
+  public void testReadWrite() throws Exception {
+    
+    MockQuery baseQuery = dataStore.newQuery();
+    baseQuery.setStartKey("start");
+    baseQuery.setLimit(42);
+    
+    PartitionQueryImpl<String, MockPersistent> 
+      query = new PartitionQueryImpl<String, MockPersistent>(baseQuery);
+    
+    TestWritable.testWritable(query);
+  }
+  
+}

Added: incubator/gora/trunk/gora-core/src/test/java/org/gora/query/impl/TestQueryBase.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/test/java/org/gora/query/impl/TestQueryBase.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/test/java/org/gora/query/impl/TestQueryBase.java (added)
+++ incubator/gora/trunk/gora-core/src/test/java/org/gora/query/impl/TestQueryBase.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,46 @@
+
+
+package org.gora.query.impl;
+
+import junit.framework.Assert;
+
+import org.gora.mock.query.MockQuery;
+import org.gora.mock.store.MockDataStore;
+import org.gora.util.TestIOUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test case for {@link QueryBase}.
+ */
+public class TestQueryBase {
+
+  private MockDataStore dataStore = MockDataStore.get();
+  private MockQuery query;
+  
+  private static final String[] FIELDS = {"foo", "baz", "bar"};
+  private static final String START_KEY = "1_start";
+  private static final String END_KEY = "2_end";
+  
+  @Before
+  public void setUp() {
+    query = dataStore.newQuery(); //MockQuery extends QueryBase
+  }
+  
+  @Test
+  public void testReadWrite() throws Exception {
+    query.setFields(FIELDS);
+    query.setKeyRange(START_KEY, END_KEY);
+    TestIOUtils.testSerializeDeserialize(query);
+    
+    Assert.assertNotNull(query.getDataStore());
+  }
+  
+  @Test
+  public void testReadWrite2() throws Exception {
+    query.setLimit(1000);
+    query.setTimeRange(0, System.currentTimeMillis());
+    TestIOUtils.testSerializeDeserialize(query);
+  }
+
+}

Added: incubator/gora/trunk/gora-core/src/test/java/org/gora/store/DataStoreTestBase.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/test/java/org/gora/store/DataStoreTestBase.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/test/java/org/gora/store/DataStoreTestBase.java (added)
+++ incubator/gora/trunk/gora-core/src/test/java/org/gora/store/DataStoreTestBase.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,350 @@
+
+package org.gora.store;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import junit.framework.Assert;
+
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.gora.GoraTestDriver;
+import org.gora.examples.generated.Employee;
+import org.gora.examples.generated.Metadata;
+import org.gora.examples.generated.WebPage;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * A base class for {@link DataStore} tests. This is just a convenience
+ * class, which actually only uses {@link DataStoreTestUtil} methods to
+ * run the tests. Not all test cases can extend this class (like TestHBaseStore),
+ * so all test logic shuold reside in DataStoreTestUtil class.
+ */
+public abstract class DataStoreTestBase {
+
+  public static final Log log = LogFactory.getLog(DataStoreTestBase.class);
+
+  protected static GoraTestDriver testDriver;
+
+  protected DataStore<String,Employee> employeeStore;
+  protected DataStore<String,WebPage> webPageStore;
+
+  @Deprecated
+  protected abstract DataStore<String,Employee> createEmployeeDataStore() throws IOException ;
+
+  @Deprecated
+  protected abstract DataStore<String,WebPage> createWebPageDataStore() throws IOException;
+
+  /** junit annoyingly forces BeforeClass to be static, so this method
+   * should be called from a static block
+   */
+  protected static void setTestDriver(GoraTestDriver driver) {
+    testDriver = driver;
+  }
+
+  private static boolean setUpClassCalled = false;
+  
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    if(testDriver != null && !setUpClassCalled) {
+      log.info("setting up class");
+      testDriver.setUpClass();
+      setUpClassCalled = true;
+    }
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    if(testDriver != null) {
+      log.info("tearing down class");
+      testDriver.tearDownClass();
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    //There is an issue in JUnit 4 tests in Eclipse where TestSqlStore static
+    //methods are not called BEFORE setUpClass. I think this is a bug in 
+    //JUnitRunner in Eclipse. Below is a workaround for that problem.
+    if(!setUpClassCalled) setUpClass();  
+    
+    log.info("setting up test");
+    if(testDriver != null) {
+      employeeStore = testDriver.createDataStore(String.class, Employee.class);
+      webPageStore = testDriver.createDataStore(String.class, WebPage.class);
+      testDriver.setUp();
+    } else {
+      employeeStore =  createEmployeeDataStore();
+      webPageStore = createWebPageDataStore();
+
+      employeeStore.truncateSchema();
+      webPageStore.truncateSchema();
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    log.info("tearing down test");
+    if(testDriver != null) {
+      testDriver.tearDown();
+    }
+    //employeeStore.close();
+    //webPageStore.close();
+  }
+
+  @Test
+  public void testNewInstance() throws IOException {
+    log.info("test method: testNewInstance");
+    DataStoreTestUtil.testNewPersistent(employeeStore);
+  }
+
+  @Test
+  public void testCreateSchema() throws Exception {
+    log.info("test method: testCreateSchema");
+    DataStoreTestUtil.testCreateEmployeeSchema(employeeStore);
+    assertSchemaExists("Employee");
+  }
+
+  // Override this to assert that schema is created correctly
+  public void assertSchemaExists(String schemaName) throws Exception {
+  }
+
+  @Test
+  public void testAutoCreateSchema() throws Exception {
+    log.info("test method: testAutoCreateSchema");
+    DataStoreTestUtil.testAutoCreateSchema(employeeStore);
+    assertAutoCreateSchema();
+  }
+
+  public void assertAutoCreateSchema() throws Exception {
+    assertSchemaExists("Employee");
+  }
+
+  @Test
+  public  void testTruncateSchema() throws Exception {
+    log.info("test method: testTruncateSchema");
+    DataStoreTestUtil.testTruncateSchema(webPageStore);
+    assertSchemaExists("WebPage");
+  }
+
+  @Test
+  public void testDeleteSchema() throws IOException {
+    log.info("test method: testDeleteSchema");
+    DataStoreTestUtil.testDeleteSchema(webPageStore);
+  }
+
+  @Test
+  public void testSchemaExists() throws Exception {
+    log.info("test method: testSchemaExists");
+    DataStoreTestUtil.testSchemaExists(webPageStore);
+    assertSchemaExists("WebPage");
+  }
+
+  @Test
+  public void testPut() throws IOException {
+    log.info("test method: testPut");
+    Employee employee = DataStoreTestUtil.testPutEmployee(employeeStore);
+    assertPut(employee);
+  }
+
+  public void assertPut(Employee employee) throws IOException {
+  }
+
+  @Test
+  public void testPutNested() throws IOException {
+    log.info("test method: testPutNested");
+
+    String revUrl = "foo.com:http/";
+    String url = "http://foo.com/";
+
+    webPageStore.createSchema();
+    WebPage page = webPageStore.newPersistent();
+    Metadata metadata = new Metadata();  
+    metadata.setVersion(1);
+    metadata.putToData(new Utf8("foo"), new Utf8("baz"));
+
+    page.setMetadata(metadata);
+    page.setUrl(new Utf8(url));
+
+    webPageStore.put(revUrl, page);
+    webPageStore.flush();
+
+    page = webPageStore.get(revUrl);
+    metadata = page.getMetadata();
+    Assert.assertNotNull(metadata);
+    Assert.assertEquals(1, metadata.getVersion());
+    Assert.assertEquals(new Utf8("baz"), metadata.getData().get(new Utf8("foo")));
+  }
+
+  @Test
+  public void testPutArray() throws IOException {
+    log.info("test method: testPutArray");
+    webPageStore.createSchema();
+    WebPage page = webPageStore.newPersistent();
+
+    String[] tokens = {"example", "content", "in", "example.com"};
+
+    for(String token: tokens) {
+      page.addToParsedContent(new Utf8(token));
+    }
+
+    webPageStore.put("com.example/http", page);
+    webPageStore.close();
+
+    assertPutArray();
+  }
+
+  public void assertPutArray() throws IOException {
+  }
+
+  @Test
+  public void testPutBytes() throws IOException {
+    log.info("test method: testPutBytes");
+    webPageStore.createSchema();
+    WebPage page = webPageStore.newPersistent();
+    page.setUrl(new Utf8("http://example.com"));
+    byte[] contentBytes = "example content in example.com".getBytes();
+    ByteBuffer buff = ByteBuffer.wrap(contentBytes);
+    page.setContent(buff);
+
+    webPageStore.put("com.example/http", page);
+    webPageStore.close();
+
+    assertPutBytes(contentBytes);
+  }
+
+  public void assertPutBytes(byte[] contentBytes) throws IOException {
+  }
+
+  @Test
+  public void testPutMap() throws IOException {
+    log.info("test method: testPutMap");
+    webPageStore.createSchema();
+
+    WebPage page = webPageStore.newPersistent();
+
+    page.setUrl(new Utf8("http://example.com"));
+    page.putToOutlinks(new Utf8("http://example2.com"), new Utf8("anchor2"));
+    page.putToOutlinks(new Utf8("http://example3.com"), new Utf8("anchor3"));
+    page.putToOutlinks(new Utf8("http://example3.com"), new Utf8("anchor4"));
+    webPageStore.put("com.example/http", page);
+    webPageStore.close();
+
+    assertPutMap();
+  }
+
+  public void assertPutMap() throws IOException {
+  }
+
+  @Test
+  public void testUpdate() throws IOException {
+    log.info("test method: testUpdate");
+    DataStoreTestUtil.testUpdateEmployee(employeeStore);
+    DataStoreTestUtil.testUpdateWebPage(webPageStore);
+  }
+
+  public void testEmptyUpdate() throws IOException {
+    DataStoreTestUtil.testEmptyUpdateEmployee(employeeStore);
+  }
+
+  @Test
+  public void testGet() throws IOException {
+    log.info("test method: testGet");
+    DataStoreTestUtil.testGetEmployee(employeeStore);
+  }
+
+  @Test
+  public void testGetWithFields() throws IOException {
+    log.info("test method: testGetWithFields");
+    DataStoreTestUtil.testGetEmployeeWithFields(employeeStore);
+  }
+
+  @Test
+  public void testGetWebPage() throws IOException {
+    log.info("test method: testGetWebPage");
+    DataStoreTestUtil.testGetWebPage(webPageStore);
+  }
+
+  @Test
+  public void testGetWebPageDefaultFields() throws IOException {
+    log.info("test method: testGetWebPageDefaultFields");
+    DataStoreTestUtil.testGetWebPageDefaultFields(webPageStore);
+  }
+
+  @Test
+  public void testGetNonExisting() throws Exception {
+    log.info("test method: testGetNonExisting");
+    DataStoreTestUtil.testGetEmployeeNonExisting(employeeStore);
+  }
+
+ @Test
+  public void testQuery() throws IOException {
+    log.info("test method: testQuery");
+    DataStoreTestUtil.testQueryWebPages(webPageStore);
+  }
+
+  @Test
+  public void testQueryStartKey() throws IOException {
+    log.info("test method: testQueryStartKey");
+    DataStoreTestUtil.testQueryWebPageStartKey(webPageStore);
+  }
+
+  @Test
+  public void testQueryEndKey() throws IOException {
+    log.info("test method: testQueryEndKey");
+    DataStoreTestUtil.testQueryWebPageEndKey(webPageStore);
+  }
+
+  @Test
+  public void testQueryKeyRange() throws IOException {
+    log.info("test method: testQueryKetRange");
+    DataStoreTestUtil.testQueryWebPageKeyRange(webPageStore);
+  }
+
+ @Test
+  public void testQueryWebPageSingleKey() throws IOException {
+   log.info("test method: testQueryWebPageSingleKey");
+    DataStoreTestUtil.testQueryWebPageSingleKey(webPageStore);
+  }
+
+  @Test
+  public void testQueryWebPageSingleKeyDefaultFields() throws IOException {
+    log.info("test method: testQuerySingleKeyDefaultFields");
+    DataStoreTestUtil.testQueryWebPageSingleKeyDefaultFields(webPageStore);
+  }
+
+  @Test
+  public void testQueryWebPageQueryEmptyResults() throws IOException {
+    log.info("test method: testQueryEmptyResults");
+    DataStoreTestUtil.testQueryWebPageEmptyResults(webPageStore);
+  }
+
+  @Test
+  public void testDelete() throws IOException {
+    log.info("test method: testDelete");
+    DataStoreTestUtil.testDelete(webPageStore);
+  }
+
+  @Test
+  public void testDeleteByQuery() throws IOException {
+    log.info("test method: testDeleteByQuery");
+    DataStoreTestUtil.testDeleteByQuery(webPageStore);
+  }
+
+  @Test
+  public void testDeleteByQueryFields() throws IOException {
+    log.info("test method: testQueryByQueryFields");
+    DataStoreTestUtil.testDeleteByQueryFields(webPageStore);
+  }
+
+  @Test
+  public void testGetPartitions() throws IOException {
+    log.info("test method: testGetPartitions");
+    DataStoreTestUtil.testGetPartitions(webPageStore);
+  }
+}

Added: incubator/gora/trunk/gora-core/src/test/java/org/gora/store/DataStoreTestUtil.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/test/java/org/gora/store/DataStoreTestUtil.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/test/java/org/gora/store/DataStoreTestUtil.java (added)
+++ incubator/gora/trunk/gora-core/src/test/java/org/gora/store/DataStoreTestUtil.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,686 @@
+
+package org.gora.store;
+
+import static org.gora.examples.WebPageDataCreator.ANCHORS;
+import static org.gora.examples.WebPageDataCreator.CONTENTS;
+import static org.gora.examples.WebPageDataCreator.LINKS;
+import static org.gora.examples.WebPageDataCreator.SORTED_URLS;
+import static org.gora.examples.WebPageDataCreator.URLS;
+import static org.gora.examples.WebPageDataCreator.URL_INDEXES;
+import static org.gora.examples.WebPageDataCreator.createWebPageData;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.util.Utf8;
+import org.gora.examples.WebPageDataCreator;
+import org.gora.examples.generated.Employee;
+import org.gora.examples.generated.WebPage;
+import org.gora.persistency.Persistent;
+import org.gora.query.PartitionQuery;
+import org.gora.query.Query;
+import org.gora.query.Result;
+import org.gora.util.ByteUtils;
+import org.gora.util.StringUtils;
+
+/**
+ * Test utilities for DataStores
+ */
+public class DataStoreTestUtil {
+
+  public static final long YEAR_IN_MS = 365L * 24L * 60L * 60L * 1000L;
+  private static final int NUM_KEYS = 4;
+
+  public static <K, T extends Persistent> void testNewPersistent(
+      DataStore<K,T> dataStore) throws IOException {
+
+    T obj1 = dataStore.newPersistent();
+    T obj2 = dataStore.newPersistent();
+
+    Assert.assertEquals(dataStore.getPersistentClass(),
+        obj1.getClass());
+    Assert.assertNotNull(obj1);
+    Assert.assertNotNull(obj2);
+    Assert.assertFalse( obj1 == obj2 );
+  }
+
+  public static <K> Employee createEmployee(
+      DataStore<K, Employee> dataStore) throws IOException {
+
+    Employee employee = dataStore.newPersistent();
+    employee.setName(new Utf8("Random Joe"));
+    employee.setDateOfBirth( System.currentTimeMillis() - 20L *  YEAR_IN_MS );
+    employee.setSalary(100000);
+    employee.setSsn(new Utf8("101010101010"));
+    return employee;
+  }
+
+  public static void testAutoCreateSchema(DataStore<String,Employee> dataStore)
+  throws IOException {
+    //should not throw exception
+    dataStore.put("foo", createEmployee(dataStore));
+  }
+
+  public static void testCreateEmployeeSchema(DataStore<String, Employee> dataStore)
+  throws IOException {
+    dataStore.createSchema();
+
+    //should not throw exception
+    dataStore.createSchema();
+  }
+
+  public static void testTruncateSchema(DataStore<String, WebPage> dataStore)
+  throws IOException {
+    dataStore.createSchema();
+    WebPageDataCreator.createWebPageData(dataStore);
+    dataStore.truncateSchema();
+
+    assertEmptyResults(dataStore.newQuery());
+  }
+
+  public static void testDeleteSchema(DataStore<String, WebPage> dataStore)
+  throws IOException {
+    dataStore.createSchema();
+    WebPageDataCreator.createWebPageData(dataStore);
+    dataStore.deleteSchema();
+    dataStore.createSchema();
+
+    assertEmptyResults(dataStore.newQuery());
+  }
+
+  public static<K, T extends Persistent> void testSchemaExists(
+      DataStore<K, T> dataStore) throws IOException {
+    dataStore.createSchema();
+
+    Assert.assertTrue(dataStore.schemaExists());
+
+    dataStore.deleteSchema();
+    Assert.assertFalse(dataStore.schemaExists());
+  }
+
+  public static void testGetEmployee(DataStore<String, Employee> dataStore)
+    throws IOException {
+    dataStore.createSchema();
+    Employee employee = DataStoreTestUtil.createEmployee(dataStore);
+    String ssn = employee.getSsn().toString();
+    dataStore.put(ssn, employee);
+    dataStore.flush();
+
+    Employee after = dataStore.get(ssn, Employee._ALL_FIELDS);
+
+    Assert.assertEquals(employee, after);
+  }
+
+  public static void testGetEmployeeNonExisting(DataStore<String, Employee> dataStore)
+    throws IOException {
+    Employee employee = dataStore.get("_NON_EXISTING_SSN_FOR_EMPLOYEE_");
+    Assert.assertNull(employee);
+  }
+
+  public static void testGetEmployeeWithFields(DataStore<String, Employee> dataStore)
+    throws IOException {
+    Employee employee = DataStoreTestUtil.createEmployee(dataStore);
+    String ssn = employee.getSsn().toString();
+    dataStore.put(ssn, employee);
+    dataStore.flush();
+
+    String[] fields = employee.getFields();
+    for(Set<String> subset : StringUtils.powerset(fields)) {
+      if(subset.isEmpty())
+        continue;
+      Employee after = dataStore.get(ssn, subset.toArray(new String[subset.size()]));
+      Employee expected = new Employee();
+      for(String field:subset) {
+        int index = expected.getFieldIndex(field);
+        expected.put(index, employee.get(index));
+      }
+
+      Assert.assertEquals(expected, after);
+    }
+  }
+
+  public static Employee testPutEmployee(DataStore<String, Employee> dataStore)
+  throws IOException {
+    dataStore.createSchema();
+    Employee employee = DataStoreTestUtil.createEmployee(dataStore);
+    return employee;
+  }
+
+  public static void testEmptyUpdateEmployee(DataStore<String, Employee> dataStore)
+  throws IOException {
+    dataStore.createSchema();
+    long ssn = 1234567890L;
+    String ssnStr = Long.toString(ssn);
+    long now = System.currentTimeMillis();
+
+    Employee employee = dataStore.newPersistent();
+    employee.setName(new Utf8("John Doe"));
+    employee.setDateOfBirth(now - 20L *  YEAR_IN_MS);
+    employee.setSalary(100000);
+    employee.setSsn(new Utf8(ssnStr));
+    dataStore.put(employee.getSsn().toString(), employee);
+
+    dataStore.flush();
+
+    employee = dataStore.get(ssnStr);
+    dataStore.put(ssnStr, employee);
+
+    dataStore.flush();
+
+    employee = dataStore.newPersistent();
+    dataStore.put(Long.toString(ssn + 1), employee);
+
+    dataStore.flush();
+
+    employee = dataStore.get(Long.toString(ssn + 1));
+    Assert.assertNull(employee);
+  }
+
+  public static void testUpdateEmployee(DataStore<String, Employee> dataStore)
+  throws IOException {
+    dataStore.createSchema();
+    long ssn = 1234567890L;
+    long now = System.currentTimeMillis();
+
+    for (int i = 0; i < 5; i++) {
+      Employee employee = dataStore.newPersistent();
+      employee.setName(new Utf8("John Doe " + i));
+      employee.setDateOfBirth(now - 20L *  YEAR_IN_MS);
+      employee.setSalary(100000);
+      employee.setSsn(new Utf8(Long.toString(ssn + i)));
+      dataStore.put(employee.getSsn().toString(), employee);
+    }
+
+    dataStore.flush();
+
+    for (int i = 0; i < 1; i++) {
+      Employee employee = dataStore.newPersistent();
+      employee.setName(new Utf8("John Doe " + (i + 5)));
+      employee.setDateOfBirth(now - 18L *  YEAR_IN_MS);
+      employee.setSalary(120000);
+      employee.setSsn(new Utf8(Long.toString(ssn + i)));
+      dataStore.put(employee.getSsn().toString(), employee);
+    }
+
+    dataStore.flush();
+
+    for (int i = 0; i < 1; i++) {
+      String key = Long.toString(ssn + i);
+      Employee employee = dataStore.get(key);
+      Assert.assertEquals(now - 18L * YEAR_IN_MS, employee.getDateOfBirth());
+      Assert.assertEquals("John Doe " + (i + 5), employee.getName().toString());
+      Assert.assertEquals(120000, employee.getSalary());
+    }
+  }
+
+  public static void testUpdateWebPage(DataStore<String, WebPage> dataStore)
+  throws IOException {
+    dataStore.createSchema();
+
+    String[] urls = {"http://a.com/a", "http://b.com/b", "http://c.com/c",
+        "http://d.com/d", "http://e.com/e", "http://f.com/f", "http://g.com/g"};
+    String content = "content";
+    String parsedContent = "parsedContent";
+    String anchor = "anchor";
+
+    int parsedContentCount = 0;
+
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.newPersistent();
+      webPage.setUrl(new Utf8(urls[i]));
+      for (parsedContentCount = 0; parsedContentCount < 5; parsedContentCount++) {
+        webPage.addToParsedContent(new Utf8(parsedContent + i + "," + parsedContentCount));
+      }
+      for (int j = 0; j < urls.length; j += 2) {
+        webPage.putToOutlinks(new Utf8(anchor + j), new Utf8(urls[j]));
+      }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.get(urls[i]);
+      webPage.setContent(ByteBuffer.wrap(ByteUtils.toBytes(content + i)));
+      for (parsedContentCount = 5; parsedContentCount < 10; parsedContentCount++) {
+        webPage.addToParsedContent(new Utf8(parsedContent + i + "," + parsedContentCount));
+      }
+      webPage.getOutlinks().clear();
+      for (int j = 1; j < urls.length; j += 2) {
+        webPage.putToOutlinks(new Utf8(anchor + j), new Utf8(urls[j]));
+      }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.get(urls[i]);
+      Assert.assertEquals(content + i, ByteUtils.toString(webPage.getContent().array()));
+      Assert.assertEquals(10, webPage.getParsedContent().size());
+      int j = 0;
+      for (Utf8 pc : webPage.getParsedContent()) {
+        Assert.assertEquals(parsedContent + i + "," + j, pc.toString());
+        j++;
+      }
+      int count = 0;
+      for (j = 1; j < urls.length; j += 2) {
+        Utf8 link = webPage.getOutlinks().get(new Utf8(anchor + j));
+        Assert.assertNotNull(link);
+        Assert.assertEquals(urls[j], link.toString());
+        count++;
+      }
+      Assert.assertEquals(count, webPage.getOutlinks().size());
+    }
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.get(urls[i]);
+      for (int j = 0; j < urls.length; j += 2) {
+        webPage.putToOutlinks(new Utf8(anchor + j), new Utf8(urls[j]));
+      }
+      dataStore.put(webPage.getUrl().toString(), webPage);
+    }
+
+    dataStore.flush();
+
+    for (int i = 0; i < urls.length; i++) {
+      WebPage webPage = dataStore.get(urls[i]);
+      int count = 0;
+      for (int j = 0; j < urls.length; j++) {
+        Utf8 link = webPage.getOutlinks().get(new Utf8(anchor + j));
+        Assert.assertNotNull(link);
+        Assert.assertEquals(urls[j], link.toString());
+        count++;
+      }
+    }
+  }
+
+  public static void assertWebPage(WebPage page, int i) {
+    Assert.assertNotNull(page);
+
+    Assert.assertEquals(URLS[i], page.getUrl().toString());
+    Assert.assertTrue("content error:" + new String(page.getContent().array()) +
+        " actual=" + CONTENTS[i] + " i=" + i
+    , Arrays.equals(page.getContent().array()
+        , CONTENTS[i].getBytes()));
+
+    GenericArray<Utf8> parsedContent = page.getParsedContent();
+    Assert.assertNotNull(parsedContent);
+    Assert.assertTrue(parsedContent.size() > 0);
+
+    int j=0;
+    String[] tokens = CONTENTS[i].split(" ");
+    for(Utf8 token : parsedContent) {
+      Assert.assertEquals(tokens[j++], token.toString());
+    }
+
+    if(LINKS[i].length > 0) {
+      Assert.assertNotNull(page.getOutlinks());
+      Assert.assertTrue(page.getOutlinks().size() > 0);
+      for(j=0; j<LINKS[i].length; j++) {
+        Assert.assertEquals(ANCHORS[i][j],
+            page.getFromOutlinks(new Utf8(URLS[LINKS[i][j]])).toString());
+      }
+    } else {
+      Assert.assertTrue(page.getOutlinks() == null || page.getOutlinks().isEmpty());
+    }
+  }
+
+  private static void testGetWebPage(DataStore<String, WebPage> store, String[] fields)
+    throws IOException {
+    createWebPageData(store);
+
+    for(int i=0; i<URLS.length; i++) {
+      WebPage page = store.get(URLS[i], fields);
+      assertWebPage(page, i);
+    }
+  }
+
+  public static void testGetWebPage(DataStore<String, WebPage> store) throws IOException {
+    testGetWebPage(store, WebPage._ALL_FIELDS);
+  }
+
+  public static void testGetWebPageDefaultFields(DataStore<String, WebPage> store)
+  throws IOException {
+    testGetWebPage(store, null);
+  }
+
+  private static void testQueryWebPageSingleKey(DataStore<String, WebPage> store
+      , String[] fields) throws IOException {
+
+    createWebPageData(store);
+
+    for(int i=0; i<URLS.length; i++) {
+      Query<String, WebPage> query = store.newQuery();
+      query.setFields(fields);
+      query.setKey(URLS[i]);
+      Result<String, WebPage> result = query.execute();
+      Assert.assertTrue(result.next());
+      WebPage page = result.get();
+      assertWebPage(page, i);
+      Assert.assertFalse(result.next());
+    }
+  }
+
+  public static void testQueryWebPageSingleKey(DataStore<String, WebPage> store)
+  throws IOException {
+    testQueryWebPageSingleKey(store, WebPage._ALL_FIELDS);
+  }
+
+  public static void testQueryWebPageSingleKeyDefaultFields(
+      DataStore<String, WebPage> store) throws IOException {
+    testQueryWebPageSingleKey(store, null);
+  }
+
+  public static void testQueryWebPageKeyRange(DataStore<String, WebPage> store,
+      boolean setStartKeys, boolean setEndKeys)
+  throws IOException {
+    createWebPageData(store);
+
+    //create sorted set of urls
+    List<String> sortedUrls = new ArrayList<String>();
+    for(String url: URLS) {
+      sortedUrls.add(url);
+    }
+    Collections.sort(sortedUrls);
+
+    //try all ranges
+    for(int i=0; i<sortedUrls.size(); i++) {
+      for(int j=i; j<sortedUrls.size(); j++) {
+        Query<String, WebPage> query = store.newQuery();
+        if(setStartKeys)
+          query.setStartKey(sortedUrls.get(i));
+        if(setEndKeys)
+          query.setEndKey(sortedUrls.get(j));
+        Result<String, WebPage> result = query.execute();
+
+        int r=0;
+        while(result.next()) {
+          WebPage page = result.get();
+          assertWebPage(page, URL_INDEXES.get(page.getUrl().toString()));
+          r++;
+        }
+
+        int expectedLength = (setEndKeys ? j+1: sortedUrls.size()) -
+                             (setStartKeys ? i: 0);
+        Assert.assertEquals(expectedLength, r);
+        if(!setEndKeys)
+          break;
+      }
+      if(!setStartKeys)
+        break;
+    }
+  }
+
+  public static void testQueryWebPages(DataStore<String, WebPage> store)
+  throws IOException {
+    testQueryWebPageKeyRange(store, false, false);
+  }
+
+  public static void testQueryWebPageStartKey(DataStore<String, WebPage> store)
+  throws IOException {
+    testQueryWebPageKeyRange(store, true, false);
+  }
+
+  public static void testQueryWebPageEndKey(DataStore<String, WebPage> store)
+  throws IOException {
+    testQueryWebPageKeyRange(store, false, true);
+  }
+
+  public static void testQueryWebPageKeyRange(DataStore<String, WebPage> store)
+  throws IOException {
+    testQueryWebPageKeyRange(store, true, true);
+  }
+
+  public static void testQueryWebPageEmptyResults(DataStore<String, WebPage> store)
+    throws IOException {
+    createWebPageData(store);
+
+    //query empty results
+    Query<String, WebPage> query = store.newQuery();
+    query.setStartKey("aa");
+    query.setEndKey("ab");
+    assertEmptyResults(query);
+
+    //query empty results for one key
+    query = store.newQuery();
+    query.setKey("aa");
+    assertEmptyResults(query);
+  }
+
+  public static<K,T extends Persistent> void assertEmptyResults(Query<K, T> query)
+    throws IOException {
+    assertNumResults(query, 0);
+  }
+
+  public static<K,T extends Persistent> void assertNumResults(Query<K, T>query
+      , long numResults) throws IOException {
+    Result<K, T> result = query.execute();
+    int actualNumResults = 0;
+    while(result.next()) {
+      actualNumResults++;
+    }
+    result.close();
+    Assert.assertEquals(numResults, actualNumResults);
+  }
+
+  public static void testGetPartitions(DataStore<String, WebPage> store)
+  throws IOException {
+    createWebPageData(store);
+    testGetPartitions(store, store.newQuery());
+  }
+
+  public static void testGetPartitions(DataStore<String, WebPage> store
+      , Query<String, WebPage> query) throws IOException {
+    List<PartitionQuery<String, WebPage>> partitions = store.getPartitions(query);
+
+    Assert.assertNotNull(partitions);
+    Assert.assertTrue(partitions.size() > 0);
+
+    for(PartitionQuery<String, WebPage> partition:partitions) {
+      Assert.assertNotNull(partition);
+    }
+
+    assertPartitions(store, query, partitions);
+  }
+
+  public static void assertPartitions(DataStore<String, WebPage> store,
+      Query<String, WebPage> query, List<PartitionQuery<String,WebPage>> partitions)
+  throws IOException {
+
+    int count = 0, partitionsCount = 0;
+    Map<String, Integer> results = new HashMap<String, Integer>();
+    Map<String, Integer> partitionResults = new HashMap<String, Integer>();
+
+    //execute query and count results
+    Result<String, WebPage> result = store.execute(query);
+    Assert.assertNotNull(result);
+
+    while(result.next()) {
+      Assert.assertNotNull(result.getKey());
+      Assert.assertNotNull(result.get());
+      results.put(result.getKey(), result.get().hashCode()); //keys are not reused, so this is safe
+      count++;
+    }
+    result.close();
+
+    Assert.assertTrue(count > 0); //assert that results is not empty
+    Assert.assertEquals(count, results.size()); //assert that keys are unique
+
+    for(PartitionQuery<String, WebPage> partition:partitions) {
+      Assert.assertNotNull(partition);
+
+      result = store.execute(partition);
+      Assert.assertNotNull(result);
+
+      while(result.next()) {
+        Assert.assertNotNull(result.getKey());
+        Assert.assertNotNull(result.get());
+        partitionResults.put(result.getKey(), result.get().hashCode());
+        partitionsCount++;
+      }
+      result.close();
+
+      Assert.assertEquals(partitionsCount, partitionResults.size()); //assert that keys are unique
+    }
+
+    Assert.assertTrue(partitionsCount > 0);
+    Assert.assertEquals(count, partitionsCount);
+
+    for(Map.Entry<String, Integer> r : results.entrySet()) {
+      Integer p = partitionResults.get(r.getKey());
+      Assert.assertNotNull(p);
+      Assert.assertEquals(r.getValue(), p);
+    }
+  }
+
+  public static void testDelete(DataStore<String, WebPage> store) throws IOException {
+    WebPageDataCreator.createWebPageData(store);
+    //delete one by one
+
+    int deletedSoFar = 0;
+    for(String url : URLS) {
+      Assert.assertTrue(store.delete(url));
+      store.flush();
+
+      //assert that it is actually deleted
+      Assert.assertNull(store.get(url));
+
+      //assert that other records are not deleted
+      assertNumResults(store.newQuery(), URLS.length - ++deletedSoFar);
+    }
+  }
+
+  public static void testDeleteByQuery(DataStore<String, WebPage> store)
+    throws IOException {
+
+    Query<String, WebPage> query;
+
+    //test 1 - delete all
+    WebPageDataCreator.createWebPageData(store);
+
+    query = store.newQuery();
+
+    assertNumResults(store.newQuery(), URLS.length);
+    store.deleteByQuery(query);
+    assertEmptyResults(store.newQuery());
+
+
+    //test 2 - delete all
+    WebPageDataCreator.createWebPageData(store);
+
+    query = store.newQuery();
+    query.setFields(WebPage._ALL_FIELDS);
+
+    assertNumResults(store.newQuery(), URLS.length);
+    store.deleteByQuery(query);
+    assertEmptyResults(store.newQuery());
+
+
+    //test 3 - delete all
+    WebPageDataCreator.createWebPageData(store);
+
+    query = store.newQuery();
+    query.setKeyRange("a", "z"); //all start with "http://"
+
+    assertNumResults(store.newQuery(), URLS.length);
+    store.deleteByQuery(query);
+    assertEmptyResults(store.newQuery());
+
+
+    //test 4 - delete some
+    WebPageDataCreator.createWebPageData(store);
+    query = store.newQuery();
+    query.setEndKey(SORTED_URLS[NUM_KEYS]);
+
+    assertNumResults(store.newQuery(), URLS.length);
+    store.deleteByQuery(query);
+    assertNumResults(store.newQuery(), URLS.length - (NUM_KEYS+1));
+
+    store.truncateSchema();
+
+  }
+
+  public static void testDeleteByQueryFields(DataStore<String, WebPage> store)
+  throws IOException {
+
+    Query<String, WebPage> query;
+
+    //test 5 - delete all with some fields
+    WebPageDataCreator.createWebPageData(store);
+
+    query = store.newQuery();
+    query.setFields(WebPage.Field.OUTLINKS.getName()
+        , WebPage.Field.PARSED_CONTENT.getName(), WebPage.Field.CONTENT.getName());
+
+    assertNumResults(store.newQuery(), URLS.length);
+    store.deleteByQuery(query);
+    store.deleteByQuery(query);
+    store.deleteByQuery(query);//don't you love that HBase sometimes does not delete arbitrarily
+    assertNumResults(store.newQuery(), URLS.length);
+
+    //assert that data is deleted
+    for (int i = 0; i < SORTED_URLS.length; i++) {
+      WebPage page = store.get(SORTED_URLS[i]);
+      Assert.assertNotNull(page);
+
+      Assert.assertNotNull(page.getUrl());
+      Assert.assertEquals(page.getUrl().toString(), SORTED_URLS[i]);
+      Assert.assertEquals(0, page.getOutlinks().size());
+      Assert.assertEquals(0, page.getParsedContent().size());
+      if(page.getContent() != null) {
+        System.out.println("url:" + page.getUrl().toString());
+        System.out.println( "limit:" + page.getContent().limit());
+      } else {
+        Assert.assertNull(page.getContent());
+      }
+    }
+
+    //test 6 - delete some with some fields
+    WebPageDataCreator.createWebPageData(store);
+
+    query = store.newQuery();
+    query.setFields(WebPage.Field.URL.getName());
+    String startKey = SORTED_URLS[NUM_KEYS];
+    String endKey = SORTED_URLS[SORTED_URLS.length - NUM_KEYS];
+    query.setStartKey(startKey);
+    query.setEndKey(endKey);
+
+    assertNumResults(store.newQuery(), URLS.length);
+    store.deleteByQuery(query);
+    store.deleteByQuery(query);
+    store.deleteByQuery(query);//don't you love that HBase sometimes does not delete arbitrarily
+
+    assertNumResults(store.newQuery(), URLS.length);
+
+    //assert that data is deleted
+    for (int i = 0; i < URLS.length; i++) {
+      WebPage page = store.get(URLS[i]);
+      Assert.assertNotNull(page);
+      if( URLS[i].compareTo(startKey) < 0 || URLS[i].compareTo(endKey) >= 0) {
+        //not deleted
+        assertWebPage(page, i);
+      } else {
+        //deleted
+        Assert.assertNull(page.getUrl());
+        Assert.assertNotNull(page.getOutlinks());
+        Assert.assertNotNull(page.getParsedContent());
+        Assert.assertNotNull(page.getContent());
+        Assert.assertTrue(page.getOutlinks().size() > 0);
+        Assert.assertTrue(page.getParsedContent().size() > 0);
+      }
+    }
+
+  }
+}

Added: incubator/gora/trunk/gora-core/src/test/java/org/gora/store/TestDataStoreFactory.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/test/java/org/gora/store/TestDataStoreFactory.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/test/java/org/gora/store/TestDataStoreFactory.java (added)
+++ incubator/gora/trunk/gora-core/src/test/java/org/gora/store/TestDataStoreFactory.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,83 @@
+
+package org.gora.store;
+
+import java.util.Properties;
+
+import junit.framework.Assert;
+
+import org.gora.avro.store.DataFileAvroStore;
+import org.gora.mock.persistency.MockPersistent;
+import org.gora.mock.store.MockDataStore;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDataStoreFactory {
+  
+  @Before
+  public void setUp() {
+  }
+
+  @Test
+  public void testGetDataStore() throws ClassNotFoundException {
+    DataStore<?,?> dataStore = DataStoreFactory.getDataStore("org.gora.mock.store.MockDataStore"
+        , String.class, MockPersistent.class);
+    Assert.assertNotNull(dataStore);
+  }
+  
+  @Test
+  public void testGetClasses() throws ClassNotFoundException {
+    DataStore<?,?> dataStore = DataStoreFactory.getDataStore("org.gora.mock.store.MockDataStore"
+        , String.class, MockPersistent.class);
+    Assert.assertNotNull(dataStore);
+    Assert.assertEquals(String.class, dataStore.getKeyClass());
+    Assert.assertEquals(MockPersistent.class, dataStore.getPersistentClass());
+  }
+  
+  @Test
+  public void testGetDataStore2() throws ClassNotFoundException {
+    DataStore<?,?> dataStore = DataStoreFactory.getDataStore(MockDataStore.class
+        , String.class, MockPersistent.class);
+    Assert.assertNotNull(dataStore);
+  }
+  
+  @Test
+  public void testGetDataStore3() throws ClassNotFoundException {
+    DataStore<?,?> dataStore1 = DataStoreFactory.getDataStore("org.gora.mock.store.MockDataStore"
+        , Object.class, MockPersistent.class);
+    DataStore<?,?> dataStore2 = DataStoreFactory.getDataStore("org.gora.mock.store.MockDataStore"
+        , Object.class, MockPersistent.class);
+    DataStore<?,?> dataStore3 = DataStoreFactory.getDataStore("org.gora.mock.store.MockDataStore"
+        , String.class, MockPersistent.class);
+    
+    Assert.assertTrue(dataStore1 == dataStore2);
+    Assert.assertNotSame(dataStore1, dataStore3);
+  }
+  
+  @Test
+  public void testReadProperties() {
+    //indirect testing
+    DataStore<?,?> dataStore = DataStoreFactory.getDataStore(String.class, MockPersistent.class);
+    Assert.assertNotNull(dataStore);
+    Assert.assertEquals(MockDataStore.class, dataStore.getClass());
+  }
+  
+  @Test
+  public void testFindProperty() {
+    Properties properties = DataStoreFactory.properties;
+    
+    DataStore<String, MockPersistent> store = new DataFileAvroStore<String,MockPersistent>();
+    
+    String fooValue = DataStoreFactory.findProperty(properties, store
+        , "foo_property", "foo_default");
+    Assert.assertEquals("foo_value", fooValue);
+    
+    String bazValue = DataStoreFactory.findProperty(properties, store
+        , "baz_property", "baz_default");
+    Assert.assertEquals("baz_value", bazValue);
+    
+    String barValue = DataStoreFactory.findProperty(properties, store
+        , "bar_property", "bar_default");
+    Assert.assertEquals("bar_value", barValue);
+  }
+  
+}

Added: incubator/gora/trunk/gora-core/src/test/java/org/gora/util/TestIOUtils.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-core/src/test/java/org/gora/util/TestIOUtils.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-core/src/test/java/org/gora/util/TestIOUtils.java (added)
+++ incubator/gora/trunk/gora-core/src/test/java/org/gora/util/TestIOUtils.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,228 @@
+
+package org.gora.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+
+import junit.framework.Assert;
+
+import org.apache.avro.ipc.ByteBufferInputStream;
+import org.apache.avro.ipc.ByteBufferOutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.gora.mapreduce.GoraMapReduceUtils;
+import org.junit.Test;
+
+/**
+ * Test case for {@link IOUtils} class.
+ */
+public class TestIOUtils {
+
+  public static final Log log = LogFactory.getLog(TestIOUtils.class);
+  
+  public static Configuration conf = new Configuration();
+
+  private static final int BOOL_ARRAY_MAX_LENGTH = 30;
+  private static final int STRING_ARRAY_MAX_LENGTH = 30;
+  
+  private static class BoolArrayWrapper implements Writable {
+    boolean[] arr;
+    @SuppressWarnings("unused")
+    public BoolArrayWrapper() {
+    }
+    public BoolArrayWrapper(boolean[] arr) {
+      this.arr = arr;
+    }
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      this.arr = IOUtils.readBoolArray(in);
+    }
+    @Override
+    public void write(DataOutput out) throws IOException {
+      IOUtils.writeBoolArray(out, arr);
+    }
+    @Override
+    public boolean equals(Object obj) {
+      return Arrays.equals(arr, ((BoolArrayWrapper)obj).arr);
+    }
+  }
+  
+  private static class StringArrayWrapper implements Writable {
+    String[] arr;
+    @SuppressWarnings("unused")
+    public StringArrayWrapper() {
+    }
+    public StringArrayWrapper(String[] arr) {
+      this.arr = arr;
+    }
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      this.arr = IOUtils.readStringArray(in);
+    }
+    @Override
+    public void write(DataOutput out) throws IOException {
+      IOUtils.writeStringArray(out, arr);
+    }
+    @Override
+    public boolean equals(Object obj) {
+      return Arrays.equals(arr, ((StringArrayWrapper)obj).arr);
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  public static <T> void testSerializeDeserialize(T... objects) throws Exception {
+    ByteBufferOutputStream os = new ByteBufferOutputStream();
+    DataOutputStream dos = new DataOutputStream(os);
+    ByteBufferInputStream is = null;
+    DataInputStream dis = null;
+    
+    GoraMapReduceUtils.setIOSerializations(conf, true);
+    
+    try {
+      for(T before : objects) {
+        IOUtils.serialize(conf, dos , before, (Class<T>)before.getClass());
+        dos.flush();
+      }
+       
+      is = new ByteBufferInputStream(os.getBufferList());
+      dis = new DataInputStream(is);
+      
+      for(T before : objects) {
+        T after = IOUtils.deserialize(conf, dis, null, (Class<T>)before.getClass());
+        
+        log.info("Before: " + before);
+        log.info("After : " + after);
+        
+        Assert.assertEquals(before, after);
+      }
+      
+      //assert that the end of input is reached
+      try {
+        long skipped = dis.skip(1);
+        Assert.assertEquals(0, skipped);
+      }catch (EOFException expected) {
+        //either should throw exception or return 0 as skipped
+      }
+    }finally {
+      org.apache.hadoop.io.IOUtils.closeStream(dos);
+      org.apache.hadoop.io.IOUtils.closeStream(os);
+      org.apache.hadoop.io.IOUtils.closeStream(dis);
+      org.apache.hadoop.io.IOUtils.closeStream(is);
+    }
+  }
+  
+  @Test
+  public void testWritableSerde() throws Exception {
+    Text text = new Text("foo goes to a bar to get some buzz");
+    testSerializeDeserialize(text);
+  }
+  
+  @Test
+  public void testJavaSerializableSerde() throws Exception {
+    Integer integer = Integer.valueOf(42);
+    testSerializeDeserialize(integer);
+  }
+  
+  @Test
+  public void testReadWriteBoolArray() throws Exception {
+    
+    boolean[][] patterns = {
+        {true},
+        {false},
+        {true, false},
+        {false, true},
+        {true, false, true},
+        {false, true, false},
+        {false, true, false, false, true, true, true},
+        {false, true, false, false, true, true, true, true},
+        {false, true, false, false, true, true, true, true, false},
+    };
+    
+    for(int i=0; i<BOOL_ARRAY_MAX_LENGTH; i++) {
+      for(int j=0; j<patterns.length; j++) {
+        boolean[] arr = new boolean[i];
+        for(int k=0; k<i; k++) {
+          arr[k] = patterns[j][k % patterns[j].length];
+        }
+        
+        testSerializeDeserialize(new BoolArrayWrapper(arr));
+      }
+    }
+  }
+  
+  @Test
+  public void testReadWriteNullFieldsInfo() throws IOException {
+
+    Integer n = null; //null
+    Integer nn = new Integer(42); //not null
+
+    testNullFieldsWith(nn);
+    testNullFieldsWith(n);
+    testNullFieldsWith(n, nn);
+    testNullFieldsWith(nn, n);
+    testNullFieldsWith(nn, n, nn, n);
+    testNullFieldsWith(nn, n, nn, n, n, n, nn, nn, nn, n, n);
+  }
+
+  private void testNullFieldsWith( Object ... values ) throws IOException {
+    DataOutputBuffer out = new DataOutputBuffer();
+    DataInputBuffer in = new DataInputBuffer();
+
+    IOUtils.writeNullFieldsInfo(out, values);
+
+    in.reset(out.getData(), out.getLength());
+
+    boolean[] ret = IOUtils.readNullFieldsInfo(in);
+
+    //assert
+    Assert.assertEquals(values.length, ret.length);
+
+    for(int i=0; i<values.length; i++) {
+      Assert.assertEquals( values[i] == null , ret[i]);
+    }
+  }
+  
+  @Test
+  public void testReadWriteStringArray() throws Exception {
+    for(int i=0; i<STRING_ARRAY_MAX_LENGTH; i++) {
+      String[] arr = new String[i];
+      for(int j=0; j<i; j++) {
+        arr[j] = String.valueOf(j);
+      }
+      
+      testSerializeDeserialize(new StringArrayWrapper(arr));
+    }
+  }
+  
+  @Test
+  public void testReadFullyBufferLimit() throws IOException {
+    for(int i=-2; i<=2; i++) {
+      byte[] bytes = new byte[IOUtils.BUFFER_SIZE + i];
+      for(int j=0; j<bytes.length; j++) {
+        bytes[j] = (byte)j;
+      }
+      ByteArrayInputStream is = new ByteArrayInputStream(bytes);
+      
+      byte[] readBytes = IOUtils.readFully(is);
+      assertByteArrayEquals(bytes, readBytes);
+    }
+  }
+  
+  public void assertByteArrayEquals(byte[] expected, byte[] actual) {
+    Assert.assertEquals("Array lengths do not match", expected.length, actual.length);
+    for(int j=0; j<expected.length; j++) {
+      Assert.assertEquals("bytes at position "+j+" do not match", expected[j], actual[j]);
+    }
+  }
+}

Added: incubator/gora/trunk/gora-hbase/build.xml
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/build.xml?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/build.xml (added)
+++ incubator/gora/trunk/gora-hbase/build.xml Fri Oct  8 21:17:10 2010
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project name="gora-hbase" default="compile">
+  <property name="project.dir" value="${basedir}/.."/>
+
+  <import file="${project.dir}/build-common.xml"/>
+</project>

Added: incubator/gora/trunk/gora-hbase/conf/.gitignore
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/conf/.gitignore?rev=1006024&view=auto
==============================================================================
    (empty)

Added: incubator/gora/trunk/gora-hbase/ivy/ivy.xml
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/ivy/ivy.xml?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/ivy/ivy.xml (added)
+++ incubator/gora/trunk/gora-hbase/ivy/ivy.xml Fri Oct  8 21:17:10 2010
@@ -0,0 +1,29 @@
+<?xml version="1.0"?>
+
+<ivy-module version="2.0">
+    <info 
+      organisation="org.gora"
+      module="gora-hbase"
+      status="integration"/>
+
+  <configurations>
+    <include file="${project.dir}/ivy/ivy-configurations.xml"/>
+  </configurations>
+  
+  <publications>
+    <artifact name="gora-hbase" conf="compile"/>
+    <artifact name="gora-hbase-test" conf="test"/>
+  </publications>
+
+  <dependencies>
+    <!-- conf="*->@" means every conf is mapped to the conf of the same name of the artifact-->
+    <dependency org="org.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/> 
+    <dependency org="org.jdom" name="jdom" rev="1.1" conf="*->master"/>
+
+    <!-- test dependencies -->
+    <dependency org="org.apache.hadoop" name="hadoop-test" rev="0.20.2" conf="test->default"/>
+
+  </dependencies>
+    
+</ivy-module>
+

Added: incubator/gora/trunk/gora-hbase/lib-ext/.gitignore
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/lib-ext/.gitignore?rev=1006024&view=auto
==============================================================================
    (empty)

Added: incubator/gora/trunk/gora-hbase/lib-ext/hbase-0.20.6-test.jar
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/lib-ext/hbase-0.20.6-test.jar?rev=1006024&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/gora/trunk/gora-hbase/lib-ext/hbase-0.20.6-test.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/gora/trunk/gora-hbase/lib-ext/hbase-0.20.6.jar
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/lib-ext/hbase-0.20.6.jar?rev=1006024&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/gora/trunk/gora-hbase/lib-ext/hbase-0.20.6.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/gora/trunk/gora-hbase/lib-ext/zookeeper-3.2.2.jar
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/lib-ext/zookeeper-3.2.2.jar?rev=1006024&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/gora/trunk/gora-hbase/lib-ext/zookeeper-3.2.2.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/gora/trunk/gora-hbase/src/examples/java/.gitignore
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/examples/java/.gitignore?rev=1006024&view=auto
==============================================================================
    (empty)

Added: incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseGetResult.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseGetResult.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseGetResult.java (added)
+++ incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseGetResult.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,47 @@
+
+package org.gora.hbase.query;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.gora.hbase.store.HBaseStore;
+import org.gora.persistency.Persistent;
+import org.gora.query.Query;
+
+/**
+ * An {@link HBaseResult} based on the result of a HBase {@link Get} query.
+ */
+public class HBaseGetResult<K, T extends Persistent> extends HBaseResult<K,T> {
+
+  private Result result;
+  
+  public HBaseGetResult(HBaseStore<K, T> dataStore, Query<K, T> query
+      , Result result) {
+    super(dataStore, query);
+    this.result = result;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return key == null ? 0f : 1f;
+  }
+
+  @Override
+  public boolean nextInner() throws IOException {
+    if(result == null || result.getRow() == null 
+        || result.getRow().length == 0) {
+      return false;
+    }
+    if(key == null) {
+      readNext(result);
+      return key != null;
+    }
+    
+    return false;
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+}

Added: incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseQuery.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseQuery.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseQuery.java (added)
+++ incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseQuery.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,22 @@
+
+package org.gora.hbase.query;
+
+import org.gora.persistency.Persistent;
+import org.gora.query.Query;
+import org.gora.query.impl.QueryBase;
+import org.gora.store.DataStore;
+
+/**
+ * HBase specific implementation of the {@link Query} interface.
+ */
+public class HBaseQuery<K, T extends Persistent> extends QueryBase<K, T> {
+
+  public HBaseQuery() {
+    super(null);
+  }
+  
+  public HBaseQuery(DataStore<K, T> dataStore) {
+    super(dataStore);
+  }
+
+}

Added: incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseResult.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseResult.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseResult.java (added)
+++ incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseResult.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,34 @@
+
+package org.gora.hbase.query;
+
+import static org.gora.hbase.util.HBaseByteInterface.fromBytes;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.gora.hbase.store.HBaseStore;
+import org.gora.persistency.Persistent;
+import org.gora.query.Query;
+import org.gora.query.impl.ResultBase;
+
+/**
+ * Base class for {@link Result} implementations for HBase.  
+ */
+public abstract class HBaseResult<K, T extends Persistent> 
+  extends ResultBase<K, T> {
+
+  public HBaseResult(HBaseStore<K,T> dataStore, Query<K, T> query) {
+    super(dataStore, query);
+  }
+  
+  @Override
+  public HBaseStore<K, T> getDataStore() {
+    return (HBaseStore<K, T>) super.getDataStore();
+  }
+  
+  protected void readNext(Result result) throws IOException {
+    key = fromBytes(getKeyClass(), result.getRow());
+    persistent = getDataStore().newInstance(result, query.getFields());
+  }
+  
+}

Added: incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseScannerResult.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseScannerResult.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseScannerResult.java (added)
+++ incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/query/HBaseScannerResult.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,54 @@
+
+package org.gora.hbase.query;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.gora.hbase.store.HBaseStore;
+import org.gora.persistency.Persistent;
+import org.gora.query.Query;
+
+/**
+ * Result of a query based on an HBase scanner.
+ */
+public class HBaseScannerResult<K, T extends Persistent> 
+  extends HBaseResult<K, T> {
+
+  private final ResultScanner scanner;
+  
+  public HBaseScannerResult(HBaseStore<K,T> dataStore, Query<K, T> query, 
+      ResultScanner scanner) {
+    super(dataStore, query);
+    this.scanner = scanner;
+  }
+
+  // do not clear object in scanner result
+  @Override
+  protected void clear() { }
+  
+  @Override
+  public boolean nextInner() throws IOException {
+    
+    Result result = scanner.next();
+    if (result == null) {
+      return false;
+    }
+    
+    readNext(result);
+    
+    return true;
+  }
+
+  @Override
+  public void close() throws IOException {
+    scanner.close();
+  }
+  
+  @Override
+  public float getProgress() throws IOException {
+    //TODO: if limit is set, we know how far we have gone 
+    return 0;
+  }
+  
+}

Added: incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/store/HBaseColumn.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/store/HBaseColumn.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/store/HBaseColumn.java (added)
+++ incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/store/HBaseColumn.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,56 @@
+package org.gora.hbase.store;
+
+import java.util.Arrays;
+
+/**
+ * Stores tablename, family, qualifier tuple 
+ */
+class HBaseColumn {
+  
+  String tableName;
+  byte[] family;
+  byte[] qualifier;
+  
+  public HBaseColumn(String tableName, byte[] family, byte[] qualifier) {
+    this.tableName = tableName;
+    this.family = family;
+    this.qualifier = qualifier;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+  
+  public byte[] getFamily() {
+    return family;
+  }
+
+  public byte[] getQualifier() {
+    return qualifier;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + Arrays.hashCode(family);
+    result = prime * result + Arrays.hashCode(qualifier);
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    HBaseColumn other = (HBaseColumn) obj;
+    if (!Arrays.equals(family, other.family))
+      return false;
+    if (!Arrays.equals(qualifier, other.qualifier))
+      return false;
+    return true;
+  }
+}

Added: incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/store/HBaseMapping.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/store/HBaseMapping.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/store/HBaseMapping.java (added)
+++ incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/store/HBaseMapping.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,99 @@
+
+package org.gora.hbase.store;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Mapping definitions for HBase
+ */
+public class HBaseMapping {
+
+  private Map<String, HTableDescriptor> tableDescriptors 
+    = new HashMap<String, HTableDescriptor>();
+  
+  //name of the primary table
+  private String tableName; 
+  
+  // a map from field name to hbase column
+  private Map<String, HBaseColumn> columnMap = 
+    new HashMap<String, HBaseColumn>();
+  
+  public HBaseMapping() {
+  }
+  
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+  
+  public String getTableName() {
+    return tableName;
+  }
+  
+  public void addTable(String tableName) {
+    if(!tableDescriptors.containsKey(tableName)) {
+      tableDescriptors.put(tableName, new HTableDescriptor(tableName));
+    }
+  }
+  
+  public HTableDescriptor getTable() {
+    return getTable(tableName);
+  }
+  
+  public HTableDescriptor getTable(String tableName) {
+    return tableDescriptors.get(tableName);
+  }
+  
+  public void addColumnFamily(String tableName, String familyName
+      , String compression, String blockCache, String blockSize, String bloomFilter
+      , String maxVersions, String timeToLive, String inMemory, String mapFileIndexInterval) {
+    
+    HColumnDescriptor columnDescriptor = addColumnFamily(tableName, familyName);
+    
+    if(compression != null)
+      columnDescriptor.setCompressionType(Algorithm.valueOf(compression));
+    if(blockCache != null)
+      columnDescriptor.setBlockCacheEnabled(Boolean.parseBoolean(blockCache));
+    if(blockSize != null)
+      columnDescriptor.setBlocksize(Integer.parseInt(blockSize));
+    if(bloomFilter != null)
+      columnDescriptor.setBloomfilter(Boolean.parseBoolean(bloomFilter));
+    if(maxVersions != null)
+      columnDescriptor.setMaxVersions(Integer.parseInt(maxVersions));
+    if(timeToLive != null)
+      columnDescriptor.setTimeToLive(Integer.parseInt(timeToLive));
+    if(inMemory != null)
+      columnDescriptor.setInMemory(Boolean.parseBoolean(inMemory));
+    if(mapFileIndexInterval != null)
+      columnDescriptor.setMapFileIndexInterval(Integer.parseInt(mapFileIndexInterval));
+    
+    getTable(tableName).addFamily(columnDescriptor);
+  }
+  
+  public HColumnDescriptor addColumnFamily(String tableName, String familyName) {
+    HTableDescriptor tableDescriptor = getTable(tableName);
+    HColumnDescriptor columnDescriptor =  tableDescriptor.getFamily(Bytes.toBytes(familyName));
+    if(columnDescriptor == null) {
+      columnDescriptor = new HColumnDescriptor(familyName);
+      tableDescriptor.addFamily(columnDescriptor);
+    }
+    return columnDescriptor;
+  }
+  
+  public void addField(String fieldName, String tableName, String family, String qualifier) {
+    byte[] familyBytes = Bytes.toBytes(family);
+    byte[] qualifierBytes = qualifier == null ? null : Bytes.toBytes(qualifier);
+    
+    HBaseColumn column = new HBaseColumn(tableName, familyBytes, qualifierBytes);
+    columnMap.put(fieldName, column);
+  }
+ 
+  public HBaseColumn getColumn(String fieldName) {
+    return columnMap.get(fieldName);
+  }
+}
\ No newline at end of file

Added: incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/store/HBaseStore.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/store/HBaseStore.java (added)
+++ incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/store/HBaseStore.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,585 @@
+package org.gora.hbase.store;
+
+import static org.gora.hbase.util.HBaseByteInterface.fromBytes;
+import static org.gora.hbase.util.HBaseByteInterface.toBytes;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.gora.hbase.query.HBaseGetResult;
+import org.gora.hbase.query.HBaseQuery;
+import org.gora.hbase.query.HBaseScannerResult;
+import org.gora.hbase.util.HBaseByteInterface;
+import org.gora.persistency.ListGenericArray;
+import org.gora.persistency.Persistent;
+import org.gora.persistency.State;
+import org.gora.persistency.StateManager;
+import org.gora.persistency.StatefulHashMap;
+import org.gora.persistency.StatefulMap;
+import org.gora.query.PartitionQuery;
+import org.gora.query.Query;
+import org.gora.query.impl.PartitionQueryImpl;
+import org.gora.store.impl.DataStoreBase;
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.input.SAXBuilder;
+
+/**
+ * DataStore for HBase.
+ *
+ * <p> Note: HBaseStore is not yet thread-safe.
+ */
+public class HBaseStore<K, T extends Persistent> extends DataStoreBase<K, T>
+implements Configurable {
+
+  public static final Log log = LogFactory.getLog(HBaseStore.class);
+
+  public static final String PARSE_MAPPING_FILE_KEY = "gora.hbase.mapping.file";
+
+  @Deprecated
+  private static final String DEPRECATED_MAPPING_FILE = "hbase-mapping.xml";
+  public static final String DEFAULT_MAPPING_FILE = "gora-hbase-mapping.xml";
+
+  private HBaseAdmin admin;
+
+  private HTable table;
+
+  private Configuration conf;
+
+  private boolean autoCreateSchema = true;
+
+  private HBaseMapping mapping;
+
+  public HBaseStore()  {
+  }
+
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass,
+      Properties properties) throws IOException {
+    super.initialize(keyClass, persistentClass, properties);
+    this.conf = new HBaseConfiguration();
+
+    admin = new HBaseAdmin(new HBaseConfiguration(getConf()));
+
+    try {
+      mapping = readMapping(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
+    } catch (FileNotFoundException ex) {
+      try {
+        mapping = readMapping(getConf().get(PARSE_MAPPING_FILE_KEY, DEPRECATED_MAPPING_FILE));
+        log.warn(DEPRECATED_MAPPING_FILE + " is deprecated, please rename the file to "
+            + DEFAULT_MAPPING_FILE);
+      } catch (FileNotFoundException ex1) {
+        throw ex; //throw the original exception
+      } catch (Exception ex1) {
+        log.warn(DEPRECATED_MAPPING_FILE + " is deprecated, please rename the file to "
+            + DEFAULT_MAPPING_FILE);
+        throw new RuntimeException(ex1);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    if(autoCreateSchema) {
+      createSchema();
+    }
+
+    table = new HTable(mapping.getTableName());
+  }
+
+  @Override
+  public String getSchemaName() {
+    return mapping.getTableName();
+  }
+
+  @Override
+  public void createSchema() throws IOException {
+    if(admin.tableExists(mapping.getTableName())) {
+      return;
+    }
+    HTableDescriptor tableDesc = mapping.getTable();
+
+    admin.createTable(tableDesc);
+  }
+
+  @Override
+  public void deleteSchema() throws IOException {
+    if(!admin.tableExists(mapping.getTableName())) {
+      if(table != null) {
+        table.getWriteBuffer().clear();
+      }
+      return;
+    }
+    admin.disableTable(mapping.getTableName());
+    admin.deleteTable(mapping.getTableName());
+  }
+
+  @Override
+  public boolean schemaExists() throws IOException {
+    return admin.tableExists(mapping.getTableName());
+  }
+
+  @Override
+  public T get(K key, String[] fields) throws IOException {
+    fields = getFieldsToQuery(fields);
+    Get get = new Get(toBytes(key));
+    addFields(get, fields);
+    Result result = table.get(get);
+    return newInstance(result, fields);
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Override
+  public void put(K key, T persistent) throws IOException {
+    Schema schema = persistent.getSchema();
+    StateManager stateManager = persistent.getStateManager();
+    byte[] keyRaw = toBytes(key);
+    Put put = new Put(keyRaw);
+    Delete delete = new Delete(keyRaw);
+    boolean hasPuts = false;
+    boolean hasDeletes = false;
+    Iterator<Field> iter = schema.getFields().iterator();
+    for (int i = 0; iter.hasNext(); i++) {
+      Field field = iter.next();
+      if (!stateManager.isDirty(persistent, i)) {
+        continue;
+      }
+      Type type = field.schema().getType();
+      Object o = persistent.get(i);
+      HBaseColumn hcol = mapping.getColumn(field.name());
+      switch(type) {
+        case MAP:
+          if(o instanceof StatefulMap) {
+            StatefulHashMap<Utf8, ?> map = (StatefulHashMap<Utf8, ?>) o;
+            for (Entry<Utf8, State> e : map.states().entrySet()) {
+              Utf8 mapKey = e.getKey();
+              switch (e.getValue()) {
+                case DIRTY:
+                  byte[] qual = Bytes.toBytes(mapKey.toString());
+                  byte[] val = toBytes(map.get(mapKey), field.schema().getValueType());
+                  put.add(hcol.getFamily(), qual, val);
+                  hasPuts = true;
+                  break;
+                case DELETED:
+                  qual = Bytes.toBytes(mapKey.toString());
+                  hasDeletes = true;
+                  delete.deleteColumn(hcol.getFamily(), qual);
+                  break;
+              }
+            }
+          } else {
+            Set<Map.Entry> set = ((Map)o).entrySet();
+            for(Entry entry: set) {
+              byte[] qual = toBytes(entry.getKey());
+              byte[] val = toBytes(entry.getValue());
+              put.add(hcol.getFamily(), qual, val);
+              hasPuts = true;
+            }
+          }
+          break;
+        case ARRAY:
+          if(o instanceof GenericArray) {
+            GenericArray arr = (GenericArray) o;
+            int j=0;
+            for(Object item : arr) {
+              byte[] val = toBytes(item);
+              put.add(hcol.getFamily(), Bytes.toBytes(j++), val);
+              hasPuts = true;
+            }
+          }
+          break;
+        default:
+          put.add(hcol.getFamily(), hcol.getQualifier(), toBytes(o, field.schema()));
+          hasPuts = true;
+          break;
+      }
+    }
+    if (hasPuts) {
+      table.put(put);
+    }
+    if (hasDeletes) {
+      table.delete(delete);
+    }
+  }
+
+  public void delete(T obj) {
+    throw new RuntimeException("Not implemented yet");
+  }
+
+  /**
+   * Deletes the object with the given key.
+   * @return always true
+   */
+  @Override
+  public boolean delete(K key) throws IOException {
+    table.delete(new Delete(toBytes(key)));
+    //HBase does not return success information and executing a get for
+    //success is a bit costly
+    return true;
+  }
+
+  @Override
+  public long deleteByQuery(Query<K, T> query) throws IOException {
+
+    String[] fields = getFieldsToQuery(query.getFields());
+    //find whether all fields are queried, which means that complete
+    //rows will be deleted
+    boolean isAllFields = Arrays.equals(fields
+        , getBeanFactory().getCachedPersistent().getFields());
+
+    org.gora.query.Result<K, T> result = query.execute();
+
+    ArrayList<Delete> deletes = new ArrayList<Delete>();
+    while(result.next()) {
+      Delete delete = new Delete(toBytes(result.getKey()));
+      deletes.add(delete);
+      if(!isAllFields) {
+        addFields(delete, query);
+      }
+    }
+    //TODO: delete by timestamp, etc
+
+    table.delete(deletes);
+
+    return deletes.size();
+  }
+
+  @Override
+  public void flush() throws IOException {
+    table.flushCommits();
+  }
+
+  @Override
+  public Query<K, T> newQuery() {
+    return new HBaseQuery<K, T>(this);
+  }
+
+  @Override
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
+      throws IOException {
+
+    // taken from o.a.h.hbase.mapreduce.TableInputFormatBase
+    Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
+    if (keys == null || keys.getFirst() == null ||
+        keys.getFirst().length == 0) {
+      throw new IOException("Expecting at least one region.");
+    }
+    if (table == null) {
+      throw new IOException("No table was provided.");
+    }
+    List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>(keys.getFirst().length);
+    for (int i = 0; i < keys.getFirst().length; i++) {
+      String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
+      getServerAddress().getHostname();
+      byte[] startRow = query.getStartKey() != null ? toBytes(query.getStartKey())
+          : HConstants.EMPTY_START_ROW;
+      byte[] stopRow = query.getEndKey() != null ? toBytes(query.getEndKey())
+          : HConstants.EMPTY_END_ROW;
+
+      // determine if the given start an stop key fall into the region
+      if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
+          Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+          (stopRow.length == 0 ||
+              Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
+
+        byte[] splitStart = (startRow.length == 0 ||
+          Bytes.compareTo(keys.getFirst()[i], startRow) >= 0) ?
+            keys.getFirst()[i] : startRow;
+
+        byte[] splitStop = (stopRow.length == 0 ||
+            Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) ?
+            keys.getSecond()[i] : stopRow;
+
+        K startKey = Arrays.equals(HConstants.EMPTY_START_ROW, splitStart) ?
+            null : HBaseByteInterface.fromBytes(keyClass, splitStart);
+        K endKey = Arrays.equals(HConstants.EMPTY_END_ROW, splitStop) ?
+            null : HBaseByteInterface.fromBytes(keyClass, splitStop);
+
+        PartitionQuery<K, T> partition = new PartitionQueryImpl<K, T>(
+            query, startKey, endKey, regionLocation);
+
+        partitions.add(partition);
+      }
+    }
+    return partitions;
+  }
+
+  @Override
+  public org.gora.query.Result<K, T> execute(Query<K, T> query)
+      throws IOException {
+
+    //check if query.fields is null
+    query.setFields(getFieldsToQuery(query.getFields()));
+
+    if(query.getStartKey() != null && query.getStartKey().equals(
+        query.getEndKey())) {
+      Get get = new Get(toBytes(query.getStartKey()));
+      addFields(get, query.getFields());
+      addTimeRange(get, query);
+      Result result = table.get(get);
+      return new HBaseGetResult<K,T>(this, query, result);
+    } else {
+      ResultScanner scanner = createScanner(query);
+
+      org.gora.query.Result<K,T> result
+      = new HBaseScannerResult<K,T>(this,query, scanner);
+
+      return result;
+    }
+  }
+
+  public ResultScanner createScanner(Query<K, T> query)
+  throws IOException {
+    final Scan scan = new Scan();
+    if (query.getStartKey() != null) {
+      scan.setStartRow(toBytes(query.getStartKey()));
+    }
+    if (query.getEndKey() != null) {
+      scan.setStopRow(toBytes(query.getEndKey()));
+    }
+    addFields(scan, query);
+
+    return table.getScanner(scan);
+  }
+
+  private void addFields(Get get, String[] fieldNames) {
+    for (String f : fieldNames) {
+      HBaseColumn col = mapping.getColumn(f);
+      Schema fieldSchema = fieldMap.get(f).schema();
+
+      switch (fieldSchema.getType()) {
+        case MAP:
+        case ARRAY:
+          get.addFamily(col.family); break;
+        default:
+          get.addColumn(col.family, col.qualifier); break;
+      }
+    }
+  }
+
+  private void addFields(Scan scan, Query<K,T> query)
+  throws IOException {
+    String[] fields = query.getFields();
+    for (String f : fields) {
+      HBaseColumn col = mapping.getColumn(f);
+      Schema fieldSchema = fieldMap.get(f).schema();
+      switch (fieldSchema.getType()) {
+        case MAP:
+        case ARRAY:
+          scan.addFamily(col.family); break;
+        default:
+          scan.addColumn(col.family, col.qualifier); break;
+      }
+    }
+  }
+
+  //TODO: HBase Get, Scan, Delete should extend some common interface with addFamily, etc
+  private void addFields(Delete delete, Query<K,T> query)
+    throws IOException {
+    String[] fields = query.getFields();
+    for (String f : fields) {
+      HBaseColumn col = mapping.getColumn(f);
+      Schema fieldSchema = fieldMap.get(f).schema();
+      switch (fieldSchema.getType()) {
+        case MAP:
+        case ARRAY:
+          delete.deleteFamily(col.family); break;
+        default:
+          delete.deleteColumn(col.family, col.qualifier); break;
+      }
+    }
+  }
+
+  private void addTimeRange(Get get, Query<K, T> query) throws IOException {
+    if(query.getStartTime() > 0 || query.getEndTime() > 0) {
+      if(query.getStartTime() == query.getEndTime()) {
+        get.setTimeStamp(query.getStartTime());
+      } else {
+        long startTime = query.getStartTime() > 0 ? query.getStartTime() : 0;
+        long endTime = query.getEndTime() > 0 ? query.getEndTime() : Long.MAX_VALUE;
+        get.setTimeRange(startTime, endTime);
+      }
+    }
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public T newInstance(Result result, String[] fields)
+  throws IOException {
+    if(result == null || result.isEmpty())
+      return null;
+
+    T persistent = newPersistent();
+    StateManager stateManager = persistent.getStateManager();
+    for (String f : fields) {
+      HBaseColumn col = mapping.getColumn(f);
+      Field field = fieldMap.get(f);
+      Schema fieldSchema = field.schema();
+      switch(fieldSchema.getType()) {
+        case MAP:
+          NavigableMap<byte[], byte[]> qualMap =
+            result.getNoVersionMap().get(col.getFamily());
+          if (qualMap == null) {
+            continue;
+          }
+          Schema valueSchema = fieldSchema.getValueType();
+          Map map = new HashMap();
+          for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
+            map.put(new Utf8(Bytes.toString(e.getKey())),
+                fromBytes(valueSchema, e.getValue()));
+          }
+          setField(persistent, field, map);
+          break;
+        case ARRAY:
+          qualMap = result.getFamilyMap(col.getFamily());
+          if (qualMap == null) {
+            continue;
+          }
+          valueSchema = fieldSchema.getElementType();
+          ArrayList arrayList = new ArrayList();
+          for (Entry<byte[], byte[]> e : qualMap.entrySet()) {
+            arrayList.add(fromBytes(valueSchema, e.getValue()));
+          }
+          ListGenericArray arr = new ListGenericArray(fieldSchema, arrayList);
+          setField(persistent, field, arr);
+          break;
+        default:
+          byte[] val =
+            result.getValue(col.getFamily(), col.getQualifier());
+          if (val == null) {
+            continue;
+          }
+          setField(persistent, field, val);
+          break;
+      }
+    }
+    stateManager.clearDirty(persistent);
+    return persistent;
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private void setField(T persistent, Field field, Map map) {
+    persistent.put(field.pos(), new StatefulHashMap(map));
+  }
+
+  private void setField(T persistent, Field field, byte[] val)
+  throws IOException {
+    persistent.put(field.pos(), fromBytes(field.schema(), val));
+  }
+
+  @SuppressWarnings("rawtypes")
+  private void setField(T persistent, Field field, GenericArray list) {
+    persistent.put(field.pos(), list);
+  }
+
+  @SuppressWarnings("unchecked")
+  private HBaseMapping readMapping(String filename) throws IOException {
+
+    HBaseMapping mapping = new HBaseMapping();
+
+    try {
+      SAXBuilder builder = new SAXBuilder();
+      Document doc = builder.build(getClass().getClassLoader()
+          .getResourceAsStream(filename));
+      Element root = doc.getRootElement();
+
+      List<Element> tableElements = root.getChildren("table");
+      for(Element tableElement : tableElements) {
+        String tableName = tableElement.getAttributeValue("name");
+        mapping.addTable(tableName);
+
+        List<Element> fieldElements = tableElement.getChildren("field");
+        for(Element fieldElement : fieldElements) {
+          String familyName  = fieldElement.getAttributeValue("name");
+          String compression = fieldElement.getAttributeValue("compression");
+          String blockCache  = fieldElement.getAttributeValue("blockCache");
+          String blockSize   = fieldElement.getAttributeValue("blockSize");
+          String bloomFilter = fieldElement.getAttributeValue("bloomFilter");
+          String maxVersions = fieldElement.getAttributeValue("maxVersions");
+          String timeToLive  = fieldElement.getAttributeValue("timeToLive");
+          String inMemory    = fieldElement.getAttributeValue("inMemory");
+          String mapFileIndexInterval  = tableElement.getAttributeValue("mapFileIndexInterval");
+
+          mapping.addColumnFamily(tableName, familyName, compression, blockCache, blockSize
+              , bloomFilter, maxVersions, timeToLive, inMemory, mapFileIndexInterval);
+        }
+      }
+
+      List<Element> classElements = root.getChildren("class");
+      for(Element classElement: classElements) {
+        if(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName())
+            && classElement.getAttributeValue("name").equals(
+                persistentClass.getCanonicalName())) {
+
+          String tableName = getSchemaName(classElement.getAttributeValue("table"), persistentClass);
+          mapping.addTable(tableName);
+          mapping.setTableName(tableName);
+
+          List<Element> fields = classElement.getChildren("field");
+          for(Element field:fields) {
+            String fieldName =  field.getAttributeValue("name");
+            String family =  field.getAttributeValue("family");
+            String qualifier = field.getAttributeValue("qualifier");
+            mapping.addField(fieldName, mapping.getTableName(), family, qualifier);
+            mapping.addColumnFamily(mapping.getTableName(), family);//implicit family definition
+          }
+
+          break;
+        }
+      }
+    } catch(IOException ex) {
+      throw ex;
+    } catch(Exception ex) {
+      throw new IOException(ex);
+    }
+
+    return mapping;
+  }
+
+  @Override
+  public void close() throws IOException {
+    flush();
+    if(table != null)
+      table.close();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+}



Mime
View raw message