incubator-gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject svn commit: r1006024 [7/8] - in /incubator/gora: branches/ tags/ trunk/ trunk/bin/ trunk/conf/ trunk/docs/ trunk/gora-cassandra/ trunk/gora-cassandra/ivy/ trunk/gora-cassandra/lib-ext/ trunk/gora-cassandra/src/ trunk/gora-cassandra/src/examples/ trunk/...
Date Fri, 08 Oct 2010 21:17:17 GMT
Added: incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/util/HBaseByteInterface.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/util/HBaseByteInterface.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/util/HBaseByteInterface.java (added)
+++ incubator/gora/trunk/gora-hbase/src/main/java/org/gora/hbase/util/HBaseByteInterface.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,130 @@
+
+package org.gora.hbase.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.gora.util.AvroUtils;
+
+/**
+ * Contains utility methods for byte[] <-> field
+ * conversions.
+ */
+public class HBaseByteInterface {
+
+  public static final byte[] EMPTY_BYTES = new byte[0];
+
+  @SuppressWarnings("rawtypes")
+  private static final SpecificDatumWriter writer =
+    new SpecificDatumWriter();
+
+  @SuppressWarnings("rawtypes")
+  private static final SpecificDatumReader reader =
+    new SpecificDatumReader();
+
+  @SuppressWarnings({ "unchecked", "deprecation" })
+  public static Object fromBytes(Schema schema, byte[] val) throws IOException {
+    Type type = schema.getType();
+    switch (type) {
+    case ENUM:    return AvroUtils.getEnumValue(schema, val[0]);
+    case STRING:  return new Utf8(Bytes.toString(val));
+    case BYTES:   return ByteBuffer.wrap(val);
+    case INT:     return Bytes.toInt(val);
+    case LONG:    return Bytes.toLong(val);
+    case FLOAT:   return Bytes.toFloat(val);
+    case DOUBLE:  return Bytes.toDouble(val);
+    case BOOLEAN: return val[0] != 0;
+    case RECORD:
+      // TODO: This is TOO SLOW... OPTIMIZE
+      reader.setSchema(schema);
+      reader.setExpected(schema);
+      BinaryDecoder decoder = new BinaryDecoder(new ByteArrayInputStream(val));
+      return reader.read(null, decoder);
+    default: throw new RuntimeException("Unknown type: "+type);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <K> K fromBytes(Class<K> clazz, byte[] val) {
+    if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
+      return (K) Byte.valueOf(val[0]);
+    } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
+      return (K) Boolean.valueOf(val[0] == 0 ? false : true);
+    } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
+      return (K) Short.valueOf(Bytes.toShort(val));
+    } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
+      return (K) Integer.valueOf(Bytes.toInt(val));
+    } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
+      return (K) Long.valueOf(Bytes.toLong(val));
+    } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
+      return (K) Float.valueOf(Bytes.toFloat(val));
+    } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
+      return (K) Double.valueOf(Bytes.toDouble(val));
+    } else if (clazz.equals(String.class)) {
+      return (K) Bytes.toString(val);
+    } else if (clazz.equals(Utf8.class)) {
+      return (K) new Utf8(Bytes.toString(val));
+    }
+    throw new RuntimeException("Can't parse data as class: " + clazz);
+  }
+
+  public static byte[] toBytes(Object o) {
+    Class<?> clazz = o.getClass();
+    if (clazz.equals(Enum.class)) {
+      return new byte[] { (byte)((Enum<?>) o).ordinal() }; // yeah, yeah it's a hack
+    } else if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
+      return new byte[] { (Byte) o };
+    } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
+      return new byte[] { ((Boolean) o ? (byte) 1 :(byte) 0)};
+    } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
+      return Bytes.toBytes((Short) o);
+    } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
+      return Bytes.toBytes((Integer) o);
+    } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
+      return Bytes.toBytes((Long) o);
+    } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
+      return Bytes.toBytes((Float) o);
+    } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
+      return Bytes.toBytes((Double) o);
+    } else if (clazz.equals(String.class)) {
+      return Bytes.toBytes((String) o);
+    } else if (clazz.equals(Utf8.class)) {
+      return ((Utf8) o).getBytes();
+    }
+    throw new RuntimeException("Can't parse data as class: " + clazz);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static byte[] toBytes(Object o, Schema schema) throws IOException {
+    Type type = schema.getType();
+    switch (type) {
+    case STRING:  return Bytes.toBytes(((Utf8)o).toString()); // TODO: maybe ((Utf8)o).getBytes(); ?
+    case BYTES:   return ((ByteBuffer)o).array();
+    case INT:     return Bytes.toBytes((Integer)o);
+    case LONG:    return Bytes.toBytes((Long)o);
+    case FLOAT:   return Bytes.toBytes((Float)o);
+    case DOUBLE:  return Bytes.toBytes((Double)o);
+    case BOOLEAN: return (Boolean)o ? new byte[] {1} : new byte[] {0};
+    case ENUM:    return new byte[] { (byte)((Enum<?>) o).ordinal() };
+    case RECORD:
+      // TODO: This is TOO SLOW... OPTIMIZE
+      writer.setSchema(schema);
+      ByteArrayOutputStream os = new ByteArrayOutputStream();
+      BinaryEncoder encoder = new BinaryEncoder(os);
+      writer.write(o, encoder);
+      encoder.flush();
+      return os.toByteArray();
+    default: throw new RuntimeException("Unknown type: "+type);
+    }
+  }
+}

Added: incubator/gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml (added)
+++ incubator/gora/trunk/gora-hbase/src/test/conf/gora-hbase-mapping.xml Fri Oct  8 21:17:10 2010
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<gora-orm>
+
+  <table name="Employee"> <!-- optional descriptors for tables -->
+    <family name="info"/> <!-- This can also have params like compression, bloom filters -->
+  </table>
+
+  <table name="WebPage">
+    <family name="common"/>
+    <family name="content"/>
+    <family name="parsedContent"/>
+    <family name="outlinks"/>
+  </table>
+
+  <class name="org.gora.examples.generated.Employee" keyClass="java.lang.String" table="Employee">
+    <field name="name" family="info" qualifier="nm"/>
+    <field name="dateOfBirth" family="info" qualifier="db"/>
+    <field name="ssn" family="info" qualifier="sn"/>
+    <field name="salary" family="info" qualifier="sl"/>
+  </class>
+
+  <class name="org.gora.examples.generated.WebPage" keyClass="java.lang.String" table="WebPage">
+    <field name="url" family="common" qualifier="u"/>
+    <field name="content" family="content"/>
+    <field name="parsedContent" family="parsedContent"/>
+    <field name="outlinks" family="outlinks"/>
+    <field name="metadata" family="common" qualifier="metadata"/>
+  </class>
+
+
+  <class name="org.gora.examples.generated.TokenDatum" keyClass="java.lang.String">
+    <field name="count" family="common" qualifier="count"/>
+  </class>
+
+</gora-orm>

Added: incubator/gora/trunk/gora-hbase/src/test/conf/hbase-site.xml
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/test/conf/hbase-site.xml?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/test/conf/hbase-site.xml (added)
+++ incubator/gora/trunk/gora-hbase/src/test/conf/hbase-site.xml Fri Oct  8 21:17:10 2010
@@ -0,0 +1,137 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  <property>
+    <name>hbase.regionserver.msginterval</name>
+    <value>1000</value>
+    <description>Interval between messages from the RegionServer to HMaster
+    in milliseconds.  Default is 15. Set this value low if you want unit
+    tests to be responsive.
+    </description>
+  </property>
+  <property>
+    <name>hbase.client.pause</name>
+    <value>5000</value>
+    <description>General client pause value.  Used mostly as value to wait
+    before running a retry of a failed get, region lookup, etc.</description>
+  </property>
+  <property>
+    <name>hbase.master.meta.thread.rescanfrequency</name>
+    <value>10000</value>
+    <description>How long the HMaster sleeps (in milliseconds) between scans of
+    the root and meta tables.
+    </description>
+  </property>
+  <property>
+    <name>hbase.server.thread.wakefrequency</name>
+    <value>1000</value>
+    <description>Time to sleep in between searches for work (in milliseconds).
+    Used as sleep interval by service threads such as META scanner and log roller.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.handler.count</name>
+    <value>5</value>
+    <description>Count of RPC Server instances spun up on RegionServers
+    Same property is used by the HMaster for count of master handlers.
+    Default is 10.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.lease.period</name>
+    <value>6000</value>
+    <description>Length of time the master will wait before timing out a region
+    server lease. Since region servers report in every second (see above), this
+    value has been reduced so that the master will notice a dead region server
+    sooner. The default is 30 seconds.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.info.port</name>
+    <value>-1</value>
+    <description>The port for the hbase master web UI
+    Set to -1 if you do not want the info server to run.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.info.port</name>
+    <value>-1</value>
+    <description>The port for the hbase regionserver web UI
+    Set to -1 if you do not want the info server to run.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.info.port.auto</name>
+    <value>true</value>
+    <description>Info server auto port bind. Enables automatic port
+    search if hbase.regionserver.info.port is already in use.
+    Enabled for testing to run multiple tests on one machine.
+    </description>
+  </property>
+  <property>
+    <name>hbase.master.lease.thread.wakefrequency</name>
+    <value>3000</value>
+    <description>The interval between checks for expired region server leases.
+    This value has been reduced due to the other reduced values above so that
+    the master will notice a dead region server sooner. The default is 15 seconds.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.optionalcacheflushinterval</name>
+    <value>10000</value>
+    <description>
+    Amount of time to wait since the last time a region was flushed before
+    invoking an optional cache flush. Default 60,000.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.safemode</name>
+    <value>false</value>
+    <description>
+    Turn on/off safe mode in region server. Always on for production, always off
+    for tests.
+    </description>
+  </property>
+  <property>
+    <name>hbase.hregion.max.filesize</name>
+    <value>67108864</value>
+    <description>
+    Maximum desired file size for an HRegion.  If filesize exceeds
+    value + (value / 2), the HRegion is split in two.  Default: 256M.
+
+    Keep the maximum filesize small so we split more often in tests.
+    </description>
+  </property>
+  <property>
+    <name>hadoop.log.dir</name>
+    <value>${user.dir}/../logs</value>
+  </property>
+  <property>
+    <name>hbase.zookeeper.property.clientPort</name>
+    <value>21810</value>
+    <description>Property from ZooKeeper's config zoo.cfg.
+    The port at which the clients will connect.
+    </description>
+  </property>
+</configuration>

Added: incubator/gora/trunk/gora-hbase/src/test/java/org/gora/hbase/GoraHBaseTestDriver.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/test/java/org/gora/hbase/GoraHBaseTestDriver.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/test/java/org/gora/hbase/GoraHBaseTestDriver.java (added)
+++ incubator/gora/trunk/gora-hbase/src/test/java/org/gora/hbase/GoraHBaseTestDriver.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,68 @@
+
+package org.gora.hbase;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.gora.GoraTestDriver;
+import org.gora.hbase.store.HBaseStore;
+
+/**
+ * Helper class for third part tests using gora-hbase backend. 
+ * @see GoraTestDriver
+ */
+public class GoraHBaseTestDriver extends GoraTestDriver {
+
+  protected HBaseTestingUtility hbaseUtil;
+  protected int numServers = 1;
+  
+  public GoraHBaseTestDriver() {
+    super(HBaseStore.class);
+    hbaseUtil = new HBaseTestingUtility();
+  }
+
+  public void setNumServers(int numServers) {
+    this.numServers = numServers;
+  }
+  
+  public int getNumServers() {
+    return numServers;
+  }
+  
+  @Override
+  public void setUpClass() throws Exception {
+    super.setUpClass();
+    log.info("Starting HBase cluster");
+    hbaseUtil.startMiniCluster(numServers);
+  }
+
+  @Override
+  public void tearDownClass() throws Exception {
+    super.tearDownClass();
+    log.info("Stoping HBase cluster");
+    hbaseUtil.shutdownMiniCluster();
+  }
+  
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+  
+  public void deleteAllTables() throws Exception {
+    HBaseAdmin admin = hbaseUtil.getHBaseAdmin();
+    for(HTableDescriptor table:admin.listTables()) {
+      admin.disableTable(table.getName());
+      admin.deleteTable(table.getName());
+    }
+  }
+  
+  public HBaseConfiguration getConf() {
+    return hbaseUtil.getConfiguration();
+  }
+  
+  public HBaseTestingUtility getHbaseUtil() {
+    return hbaseUtil;
+  }
+  
+}

Added: incubator/gora/trunk/gora-hbase/src/test/java/org/gora/hbase/mapreduce/TestHBaseStoreMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/test/java/org/gora/hbase/mapreduce/TestHBaseStoreMapReduce.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/test/java/org/gora/hbase/mapreduce/TestHBaseStoreMapReduce.java (added)
+++ incubator/gora/trunk/gora-hbase/src/test/java/org/gora/hbase/mapreduce/TestHBaseStoreMapReduce.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,52 @@
+
+package org.gora.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.gora.examples.generated.TokenDatum;
+import org.gora.examples.generated.WebPage;
+import org.gora.hbase.store.HBaseStore;
+import org.gora.mapreduce.MapReduceTestUtils;
+import org.gora.store.DataStoreFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests related to {@link HBaseStore} using mapreduce.
+ */
+public class TestHBaseStoreMapReduce extends HBaseClusterTestCase{
+
+  private HBaseStore<String, WebPage> webPageStore;
+  private HBaseStore<String, TokenDatum> tokenStore;
+  
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    webPageStore = DataStoreFactory.getDataStore(
+        HBaseStore.class, String.class, WebPage.class);
+    tokenStore = DataStoreFactory.getDataStore(HBaseStore.class, 
+        String.class, TokenDatum.class);
+  }
+  
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    webPageStore.close();
+  }
+  
+  @Test
+  public void testCountQuery() throws Exception {
+    MapReduceTestUtils.testCountQuery(webPageStore, conf);
+  }
+  
+  @Test
+  public void testWordCount() throws Exception {
+    MapReduceTestUtils.testWordCount(conf, webPageStore, tokenStore);
+  }
+  
+  public static void main(String[] args) throws Exception {
+   TestHBaseStoreMapReduce test =  new TestHBaseStoreMapReduce();
+   test.setUp();
+   test.testCountQuery();
+  }
+}

Added: incubator/gora/trunk/gora-hbase/src/test/java/org/gora/hbase/store/TestHBaseStore.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-hbase/src/test/java/org/gora/hbase/store/TestHBaseStore.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-hbase/src/test/java/org/gora/hbase/store/TestHBaseStore.java (added)
+++ incubator/gora/trunk/gora-hbase/src/test/java/org/gora/hbase/store/TestHBaseStore.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,105 @@
+
+package org.gora.hbase.store;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.gora.examples.generated.Employee;
+import org.gora.examples.generated.WebPage;
+import org.gora.hbase.GoraHBaseTestDriver;
+import org.gora.store.DataStore;
+import org.gora.store.DataStoreFactory;
+import org.gora.store.DataStoreTestBase;
+
+/**
+ * Test case for HBaseStore.
+ */
+public class TestHBaseStore extends DataStoreTestBase {
+
+  static {
+    setTestDriver(new GoraHBaseTestDriver());
+  }
+    
+  @Override
+  protected DataStore<String, Employee> createEmployeeDataStore()
+      throws IOException {
+    return DataStoreFactory.createDataStore(HBaseStore.class, String.class, 
+        Employee.class);
+  }
+
+  @Override
+  protected DataStore<String, WebPage> createWebPageDataStore()
+      throws IOException {
+    return DataStoreFactory.createDataStore(HBaseStore.class, String.class, 
+        WebPage.class);
+  }
+
+  public GoraHBaseTestDriver getTestDriver() {
+    return (GoraHBaseTestDriver) testDriver;
+  }
+  
+  @Override
+  public void assertSchemaExists(String schemaName) throws Exception {
+    HBaseAdmin admin = getTestDriver().getHbaseUtil().getHBaseAdmin();
+    Assert.assertTrue(admin.tableExists(schemaName));
+  }
+
+  @Override
+  public void assertPutArray() throws IOException { 
+    HTable table = new HTable("WebPage");
+    Get get = new Get(Bytes.toBytes("com.example/http"));
+    org.apache.hadoop.hbase.client.Result result = table.get(get);
+    
+    Assert.assertEquals(result.getFamilyMap(Bytes.toBytes("parsedContent")).size(), 4);
+    Assert.assertTrue(Arrays.equals(result.getValue(Bytes.toBytes("parsedContent")
+        ,Bytes.toBytes(0)), Bytes.toBytes("example")));
+    
+    Assert.assertTrue(Arrays.equals(result.getValue(Bytes.toBytes("parsedContent")
+        ,Bytes.toBytes(3)), Bytes.toBytes("example.com")));
+    table.close();
+  }
+  
+  
+  @Override
+  public void assertPutBytes(byte[] contentBytes) throws IOException {    
+    HTable table = new HTable("WebPage");
+    Get get = new Get(Bytes.toBytes("com.example/http"));
+    org.apache.hadoop.hbase.client.Result result = table.get(get);
+    
+    byte[] actualBytes = result.getValue(Bytes.toBytes("content"), null);
+    Assert.assertNotNull(actualBytes);
+    Assert.assertTrue(Arrays.equals(contentBytes, actualBytes));
+    table.close();
+  }
+  
+  @Override
+  public void assertPutMap() throws IOException {
+    HTable table = new HTable("WebPage");
+    Get get = new Get(Bytes.toBytes("com.example/http"));
+    org.apache.hadoop.hbase.client.Result result = table.get(get);
+    
+    byte[] anchor2Raw = result.getValue(Bytes.toBytes("outlinks")
+        , Bytes.toBytes("http://example2.com"));
+    Assert.assertNotNull(anchor2Raw);
+    String anchor2 = Bytes.toString(anchor2Raw);
+    Assert.assertEquals("anchor2", anchor2);
+    table.close();
+  }
+  
+  public static void main(String[] args) throws Exception {
+    TestHBaseStore test = new TestHBaseStore();
+    test.setUpClass();
+    test.setUp();
+    
+    test.testQuery();
+    
+    test.tearDown();
+    test.tearDownClass();
+  }
+}

Added: incubator/gora/trunk/gora-sql/build.xml
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/build.xml?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/build.xml (added)
+++ incubator/gora/trunk/gora-sql/build.xml Fri Oct  8 21:17:10 2010
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project name="gora-sql" default="compile">
+  <property name="project.dir" value="${basedir}/.."/>
+
+  <import file="${project.dir}/build-common.xml"/>
+</project>

Added: incubator/gora/trunk/gora-sql/conf/.gitignore
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/conf/.gitignore?rev=1006024&view=auto
==============================================================================
    (empty)

Added: incubator/gora/trunk/gora-sql/ivy/ivy.xml
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/ivy/ivy.xml?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/ivy/ivy.xml (added)
+++ incubator/gora/trunk/gora-sql/ivy/ivy.xml Fri Oct  8 21:17:10 2010
@@ -0,0 +1,30 @@
+<?xml version="1.0"?>
+
+<ivy-module version="2.0">
+    <info 
+      organisation="org.gora"
+      module="gora-sql"
+      status="integration"/>
+
+  <configurations>
+    <include file="${project.dir}/ivy/ivy-configurations.xml"/>
+  </configurations>
+  
+  <publications>
+    <artifact name="gora-sql" conf="compile"/>
+    <artifact name="gora-sql-test" conf="test"/>
+  </publications>
+
+  <dependencies>
+    <!-- conf="*->@" means every conf is mapped to the conf of the same name of the artifact-->
+    <dependency org="org.gora" name="gora-core" rev="latest.integration" changing="true" conf="*->@"/> 
+    <dependency org="org.jdom" name="jdom" rev="1.1" conf="*->master"/>
+    <dependency org="com.healthmarketscience.sqlbuilder" name="sqlbuilder" rev="2.0.6" conf="*->default"/>
+
+    <!-- test dependencies -->
+    <dependency org="org.hsqldb" name="hsqldb" rev="2.0.0" conf="test->default"/>
+
+  </dependencies>
+    
+</ivy-module>
+

Added: incubator/gora/trunk/gora-sql/lib-ext/.gitignore
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/lib-ext/.gitignore?rev=1006024&view=auto
==============================================================================
    (empty)

Added: incubator/gora/trunk/gora-sql/src/examples/java/.gitignore
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/src/examples/java/.gitignore?rev=1006024&view=auto
==============================================================================
    (empty)

Added: incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/query/SqlQuery.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/query/SqlQuery.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/query/SqlQuery.java (added)
+++ incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/query/SqlQuery.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,21 @@
+
+package org.gora.sql.query;
+
+import org.gora.persistency.Persistent;
+import org.gora.query.impl.QueryBase;
+import org.gora.sql.store.SqlStore;
+
+/**
+ * Query implementation covering SQL queries
+ */
+public class SqlQuery<K, T extends Persistent> extends QueryBase<K, T> {
+
+  public SqlQuery() {
+    super(null);
+  }
+
+  public SqlQuery(SqlStore<K, T> dataStore) {
+    super(dataStore);
+  }
+
+}

Added: incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/query/SqlResult.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/query/SqlResult.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/query/SqlResult.java (added)
+++ incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/query/SqlResult.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,55 @@
+
+package org.gora.sql.query;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+
+import org.gora.persistency.Persistent;
+import org.gora.query.Query;
+import org.gora.query.impl.ResultBase;
+import org.gora.sql.store.SqlStore;
+import org.gora.sql.util.SqlUtils;
+import org.gora.store.DataStore;
+
+public class SqlResult<K, T extends Persistent> extends ResultBase<K, T> {
+
+  private ResultSet resultSet;
+  private PreparedStatement statement;
+  
+  public SqlResult(DataStore<K, T> dataStore, Query<K, T> query
+      , ResultSet resultSet, PreparedStatement statement) {
+    super(dataStore, query);
+    this.resultSet = resultSet;
+    this.statement = statement;
+  }
+
+  @Override
+  protected boolean nextInner() throws IOException {
+    try {
+      if(!resultSet.next()) { //no matching result
+        close();
+        return false;
+      }
+
+      SqlStore<K, T> sqlStore = ((SqlStore<K,T>)dataStore);
+      key = sqlStore.readPrimaryKey(resultSet);
+      persistent = sqlStore.readObject(resultSet, persistent, query.getFields());
+
+      return true;
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    SqlUtils.close(resultSet);
+    SqlUtils.close(statement);
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return 0;
+  }
+}

Added: incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/Delete.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/Delete.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/Delete.java (added)
+++ incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/Delete.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,52 @@
+
+package org.gora.sql.statement;
+
+/**
+ * A SQL DELETE statement, for generating a Prepared Statement
+ */
+//new API experiment
+public class Delete {
+
+  private String from;
+  private Where where;
+  
+  /**
+   * @return the from
+   */
+  public String from() {
+    return from;
+  }
+
+  /**
+   * @param from the from to set
+   */
+  public Delete from(String from) {
+    this.from = from;
+    return this;
+  }
+  
+  public Delete where(Where where) {
+    this.where = where;
+    return this;
+  }
+  
+  public Where where() {
+    if(where == null) {
+      where = new Where();
+    }
+    return where;
+  }
+  
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("DELETE FROM ");
+    builder.append(from);
+    
+    if(where != null && !where.isEmpty()) {
+      builder.append(" WHERE ");
+      builder.append(where.toString());
+    }
+    
+    return builder.toString();
+  }
+}

Added: incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/HSqlInsertUpdateStatement.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/HSqlInsertUpdateStatement.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/HSqlInsertUpdateStatement.java (added)
+++ incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/HSqlInsertUpdateStatement.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,110 @@
+package org.gora.sql.statement;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Map.Entry;
+
+import org.gora.persistency.Persistent;
+import org.gora.sql.store.Column;
+import org.gora.sql.store.SqlMapping;
+import org.gora.sql.store.SqlStore;
+
+public class HSqlInsertUpdateStatement<K, T extends Persistent>
+extends InsertUpdateStatement<K, T> {
+
+  public HSqlInsertUpdateStatement(SqlStore<K, T> store, SqlMapping mapping,
+      String tableName) {
+    super(store, mapping, tableName);
+  }
+
+  private String getVariable(String columnName) {
+    return "v_" + columnName;
+  }
+
+  @Override
+  public PreparedStatement toStatement(Connection connection)
+  throws SQLException {
+    int i;
+
+    StringBuilder buf = new StringBuilder("MERGE INTO ");
+    buf.append(tableName).append(" USING (VALUES(");
+
+    i = 0;
+    for (Entry<String, ColumnData> e : columnMap.entrySet()) {
+      Column column = e.getValue().column;
+      if (i != 0) buf.append(",");
+      buf.append("CAST(? AS ");
+      buf.append(column.getJdbcType().toString());
+      if (column.getScaleOrLength() > 0) {
+        buf.append("(").append(column.getScaleOrLength()).append(")");
+      }
+      buf.append(")");
+      i++;
+    }
+    buf.append(")) AS vals(");
+
+    i = 0;
+    for (String columnName : columnMap.keySet()) {
+      if (i != 0) buf.append(",");
+      buf.append(getVariable(columnName));
+      i++;
+    }
+
+    buf.append(") ON ").append(tableName).append(".").append(mapping.getPrimaryColumnName()).append("=vals.");
+    buf.append(getVariable(mapping.getPrimaryColumnName()));
+
+    buf.append(" WHEN MATCHED THEN UPDATE SET ");
+    i = 0;
+    for (String columnName : columnMap.keySet()) {
+      if (columnName.equals(mapping.getPrimaryColumnName())) {
+        continue;
+      }
+      if (i != 0) { buf.append(","); }
+      buf.append(tableName).append(".").append(columnName).append("=vals.");
+      buf.append(getVariable(columnName));
+      i++;
+    }
+
+    buf.append(" WHEN NOT MATCHED THEN INSERT (");
+    i = 0;
+    for (String columnName : columnMap.keySet()) {
+      if (i != 0) { buf.append(","); }
+      buf.append(columnName);
+      i++;
+    }
+    i = 0;
+    buf.append(") VALUES ");
+    for (String columnName : columnMap.keySet()) {
+      if (i != 0) { buf.append(","); }
+      buf.append("vals.").append(getVariable(columnName));
+      i++;
+    }
+
+    Column primaryColumn = mapping.getPrimaryColumn();
+    PreparedStatement insert = connection.prepareStatement(buf.toString());
+    int psIndex = 1;
+    for (Entry<String, ColumnData> e : columnMap.entrySet()) {
+      ColumnData cd = e.getValue();
+      Column column = cd.column;
+      if (column.getName().equals(primaryColumn.getName())) {
+        Object key = columnMap.get(primaryColumn.getName()).object;
+        if (primaryColumn.getScaleOrLength() > 0) {
+          insert.setObject(psIndex++, key,
+              primaryColumn.getJdbcType().getOrder(), primaryColumn.getScaleOrLength());
+        } else {
+          insert.setObject(psIndex++, key, primaryColumn.getJdbcType().getOrder());
+        }
+        continue;
+      }
+      try {
+        store.setObject(insert, psIndex++, cd.object, cd.schema, cd.column);
+      } catch (IOException ex) {
+        throw new SQLException(ex);
+      }
+    }
+
+    return insert;
+  }
+}

Added: incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/InsertStatement.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/InsertStatement.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/InsertStatement.java (added)
+++ incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/InsertStatement.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,92 @@
+package org.gora.sql.statement;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.gora.sql.store.SqlMapping;
+import org.gora.util.StringUtils;
+
+/**
+ * An SQL INSERT statement, for generating a Prepared Statement
+ */
+public class InsertStatement {
+
+  private SqlMapping mapping;
+  private String tableName;
+  private List<String> columnNames;
+
+  public InsertStatement(SqlMapping mapping, String tableName) {
+    this.mapping = mapping;
+    this.tableName = tableName;
+    this.columnNames = new ArrayList<String>();
+  }
+
+  public InsertStatement(SqlMapping mapping, String tableName, String... columnNames) {
+    this.mapping = mapping;
+    this.tableName = tableName;
+    this.columnNames = Arrays.asList(columnNames);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("INSERT INTO ");
+    builder.append(tableName);
+
+    StringUtils.join(builder.append(" ("), columnNames).append(" )");
+
+    builder.append("VALUES (");
+    for(int i = 0; i < columnNames.size(); i++) {
+      if (i != 0) builder.append(",");
+      builder.append("?");
+    }
+
+    builder.append(") ON DUPLICATE KEY UPDATE ");
+    columnNames.remove(mapping.getPrimaryColumnName());
+    for(int i = 0; i < columnNames.size(); i++) {
+      if (i != 0) builder.append(",");
+      builder.append(columnNames.get(i));
+      builder.append("=");
+      builder.append("?");
+    }
+    builder.append(";");
+
+    return builder.toString();
+  }
+
+  /**
+   * @return the tableName
+   */
+  public String getTableName() {
+    return tableName;
+  }
+
+  /**
+   * @param tableName the tableName to set
+   */
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  /**
+   * @return the columnNames
+   */
+  public List<String> getColumnNames() {
+    return columnNames;
+  }
+
+  /**
+   * @param columnNames the columnNames to set
+   */
+  public void setColumnNames(String... columnNames) {
+    this.columnNames = Arrays.asList(columnNames);
+  }
+
+  public void addColumnName(String columnName) {
+    this.columnNames.add(columnName);
+  }
+
+  public void clear() {
+    this.columnNames.clear();
+  }
+}
\ No newline at end of file

Added: incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/InsertUpdateStatement.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/InsertUpdateStatement.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/InsertUpdateStatement.java (added)
+++ incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/InsertUpdateStatement.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,49 @@
+package org.gora.sql.statement;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.avro.Schema;
+import org.gora.persistency.Persistent;
+import org.gora.sql.store.Column;
+import org.gora.sql.store.SqlMapping;
+import org.gora.sql.store.SqlStore;
+
+public abstract class InsertUpdateStatement<K, V extends Persistent> {
+
+  protected class ColumnData {
+    protected Object object;
+    protected Schema schema;
+    protected Column column;
+
+    protected ColumnData(Object object, Schema schema, Column column) {
+      this.object = object;
+      this.schema = schema;
+      this.column = column;
+    }
+  }
+
+  protected SortedMap<String, ColumnData> columnMap = new TreeMap<String, ColumnData>();
+
+  protected String tableName;
+
+  protected SqlMapping mapping;
+
+  protected SqlStore<K, V> store;
+
+  public InsertUpdateStatement(SqlStore<K, V> store, SqlMapping mapping, String tableName) {
+    this.store = store;
+    this.mapping = mapping;
+    this.tableName = tableName;
+  }
+
+  public void setObject(Object object, Schema schema, Column column) {
+    columnMap.put(column.getName(), new ColumnData(object, schema, column));
+  }
+
+  public abstract PreparedStatement toStatement(Connection connection)
+  throws SQLException;
+}

Added: incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/InsertUpdateStatementFactory.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/InsertUpdateStatementFactory.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/InsertUpdateStatementFactory.java (added)
+++ incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/InsertUpdateStatementFactory.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,23 @@
+package org.gora.sql.statement;
+
+import org.gora.persistency.Persistent;
+import org.gora.sql.store.SqlMapping;
+import org.gora.sql.store.SqlStore;
+import org.gora.sql.store.SqlStore.DBVendor;
+
+public class InsertUpdateStatementFactory {
+
+  public static <K, T extends Persistent>
+  InsertUpdateStatement<K, T> createStatement(SqlStore<K, T> store,
+      SqlMapping mapping, DBVendor dbVendor) {
+    switch(dbVendor) {
+      case MYSQL:
+        return new MySqlInsertUpdateStatement<K, T>(store, mapping, mapping.getTableName());
+      case HSQL:
+        return new HSqlInsertUpdateStatement<K, T>(store, mapping, mapping.getTableName());
+      case GENERIC:
+      default :
+        throw new RuntimeException("Database is not supported yet.");    
+    }
+  }
+}

Added: incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/MySqlInsertUpdateStatement.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/MySqlInsertUpdateStatement.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/MySqlInsertUpdateStatement.java (added)
+++ incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/MySqlInsertUpdateStatement.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,89 @@
+package org.gora.sql.statement;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Map.Entry;
+
+import org.apache.avro.Schema;
+import org.gora.persistency.Persistent;
+import org.gora.sql.store.Column;
+import org.gora.sql.store.SqlMapping;
+import org.gora.sql.store.SqlStore;
+import org.gora.util.StringUtils;
+
+public class MySqlInsertUpdateStatement<K, V extends Persistent> extends InsertUpdateStatement<K, V> {
+
+  public MySqlInsertUpdateStatement(SqlStore<K, V> store, SqlMapping mapping, String tableName) {
+    super(store, mapping, tableName);
+  }
+
+  @Override
+  public PreparedStatement toStatement(Connection connection)
+  throws SQLException {
+    int i = 0;
+    StringBuilder builder = new StringBuilder("INSERT INTO ");
+    builder.append(tableName);
+    StringUtils.join(builder.append(" ("), columnMap.keySet()).append(" )");
+
+    builder.append("VALUES (");
+    for(i = 0; i < columnMap.size(); i++) {
+      if (i != 0) builder.append(",");
+      builder.append("?");
+    }
+
+    builder.append(") ON DUPLICATE KEY UPDATE ");
+
+    // TODO: Fix this stupid code. We need to make sure primary key field
+    // is not in UPDATE part of sql query. This desperately needs a
+    // better solution
+    Column primaryColumn = mapping.getPrimaryColumn();
+    Object key = columnMap.get(primaryColumn.getName()).object;
+    i = 0;
+    for(String s : columnMap.keySet()) {
+      if (s.equals(primaryColumn.getName())) {
+        continue;
+      }
+      if (i != 0) builder.append(",");
+      builder.append(s).append("=").append("?");
+      i++;
+    }
+    builder.append(";");
+
+    PreparedStatement insert = connection.prepareStatement(builder.toString());
+
+    int psIndex = 1;
+    for (int count = 0; count < 2; count++) {
+      for (Entry<String, ColumnData> e : columnMap.entrySet()) {
+        ColumnData columnData = e.getValue();
+        Column column = columnData.column;
+        Schema fieldSchema = columnData.schema;
+        Object fieldValue = columnData.object;
+
+        // check if primary key
+        if (column.getName().equals(primaryColumn.getName())) {
+          if (count == 1) {
+            continue;
+          }
+          if (primaryColumn.getScaleOrLength() > 0) {
+            insert.setObject(psIndex++, key,
+                primaryColumn.getJdbcType().getOrder(), primaryColumn.getScaleOrLength());
+          } else {
+            insert.setObject(psIndex++, key, primaryColumn.getJdbcType().getOrder());
+          }
+          continue;
+        }
+
+        try {
+          store.setObject(insert, psIndex++, fieldValue, fieldSchema, column);
+        } catch (IOException ex) {
+          throw new SQLException(ex);
+        }
+      }
+    }
+
+    return insert;
+  }
+
+}

Added: incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/SelectStatement.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/SelectStatement.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/SelectStatement.java (added)
+++ incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/SelectStatement.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,245 @@
+package org.gora.sql.statement;
+
+import java.util.ArrayList;
+
+import org.gora.util.StringUtils;
+
+/** A SQL SELECT statement */
+public class SelectStatement {
+  
+  private String selectStatement;
+  private ArrayList<String> selectList;
+  private String from;
+  private Where where;
+  private String groupBy;
+  private String having;
+  private String orderBy;
+  private boolean orderByAsc = true; //whether ascending or descending
+  private long offset = -1;
+  private long limit = -1 ;
+  private boolean semicolon = true;
+  
+  public SelectStatement() {
+    this.selectList = new ArrayList<String>();
+  }
+  
+  public SelectStatement(String from) {
+    this();
+    this.from = from;
+  }
+  
+  public SelectStatement(String selectList, String from, String where,
+      String orderBy) {
+    this.selectStatement = selectList;
+    this.from = from;
+    setWhere(where);
+    this.orderBy = orderBy;
+  }
+  
+  public SelectStatement(String selectList, String from, Where where,
+      String groupBy, String having, String orderBy, boolean orderByAsc,
+      int offset, int limit, boolean semicolon) {
+    super();
+    this.selectStatement = selectList;
+    this.from = from;
+    this.where = where;
+    this.groupBy = groupBy;
+    this.having = having;
+    this.orderBy = orderBy;
+    this.orderByAsc = orderByAsc;
+    this.offset = offset;
+    this.limit = limit;
+    this.semicolon = semicolon;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder("SELECT ");
+    if(selectStatement != null)
+      builder.append(selectStatement);
+    else
+      StringUtils.join(builder, selectList);
+    append(builder, "FROM", from);
+    append(builder, "WHERE", where);
+    append(builder, "GROUP BY", groupBy);
+    append(builder, "HAVING", having);
+    append(builder, "ORDER BY", orderBy);
+    if(orderBy != null)
+      builder.append(" ").append(orderByAsc?" ASC ":" DESC ");
+    if(limit > 0)
+      builder.append(" LIMIT ").append(limit);
+    if(offset >= 0)
+      builder.append(" OFFSET ").append(offset);
+    if(semicolon)
+      builder.append(";");
+    return builder.toString();
+  }
+  
+  /** Adds a part to the Where clause connected with AND */
+  public void addWhere(String part) {
+    if(where == null)
+      where = new Where();
+    where.addPart(part);
+  }
+  
+  /** Appends the clause if not null */
+  static void append(StringBuilder builder, String sqlClause, Object clause ) {
+    if(clause != null && !clause.toString().equals("")) {
+      builder.append(" ").append(sqlClause).append(" ").append(clause.toString());
+    }
+  }
+
+  public void setSelectStatement(String selectStatement) {
+    this.selectStatement = selectStatement;
+  }
+  
+  public String getSelectStatement() {
+    return selectStatement;
+  }
+
+  public ArrayList<String> getSelectList() {
+    return selectList;
+  }
+  
+  public void setSelectList(ArrayList<String> selectList) {
+    this.selectList = selectList;
+  }
+  
+  public void addToSelectList(String selectField) {
+    selectList.add(selectField);
+  }
+  
+  /**
+   * @return the from
+   */
+  public String getFrom() {
+    return from;
+  }
+
+  /**
+   * @param from the from to set
+   */
+  public void setFrom(String from) {
+    this.from = from;
+  }
+
+  /**
+   * @return the where
+   */
+  public Where getWhere() {
+    return where;
+  }
+
+  /**
+   * @param where the where to set
+   */
+  public void setWhere(Where where) {
+    this.where = where;
+  }
+  
+  /**
+   * @param where the where to set
+   */
+  public void setWhere(String where) {
+    this.where = new Where(where);
+  }
+
+  /**
+   * @return the groupBy
+   */
+  public String getGroupBy() {
+    return groupBy;
+  }
+
+  /**
+   * @param groupBy the groupBy to set
+   */
+  public void setGroupBy(String groupBy) {
+    this.groupBy = groupBy;
+  }
+
+  /**
+   * @return the having
+   */
+  public String getHaving() {
+    return having;
+  }
+
+  /**
+   * @param having the having to set
+   */
+  public void setHaving(String having) {
+    this.having = having;
+  }
+
+  /**
+   * @return the orderBy
+   */
+  public String getOrderBy() {
+    return orderBy;
+  }
+
+  /**
+   * @param orderBy the orderBy to set
+   */
+  public void setOrderBy(String orderBy) {
+    this.orderBy = orderBy;
+  }
+
+  /**
+   * @return the orderByAsc
+   */
+  public boolean isOrderByAsc() {
+    return orderByAsc;
+  }
+
+  /**
+   * @param orderByAsc the orderByAsc to set
+   */
+  public void setOrderByAsc(boolean orderByAsc) {
+    this.orderByAsc = orderByAsc;
+  }
+
+  /**
+   * @return the offset
+   */
+  public long getOffset() {
+    return offset;
+  }
+
+  /**
+   * @param offset the offset to set
+   */
+  public void setOffset(long offset) {
+    this.offset = offset;
+  }
+
+  /**
+   * @return the limit
+   */
+  public long getLimit() {
+    return limit;
+  }
+
+  /**
+   * @param limit the limit to set
+   */
+  public void setLimit(long limit) {
+    this.limit = limit;
+  }
+
+  /**
+   * @return the semicolon
+   */
+  public boolean isSemicolon() {
+    return semicolon;
+  }
+
+  /**
+   * @param semicolon the semicolon to set
+   */
+  public void setSemicolon(boolean semicolon) {
+    this.semicolon = semicolon;
+  }
+  
+}
\ No newline at end of file

Added: incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/Where.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/Where.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/Where.java (added)
+++ incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/statement/Where.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,54 @@
+package org.gora.sql.statement;
+
+/**
+ * A WHERE clause in an SQL statement
+ */
+public class Where {
+
+  private StringBuilder builder;
+
+  public Where() {
+    builder = new StringBuilder();
+  }
+
+  public Where(String where) {
+    builder = new StringBuilder(where == null ? "" : where);
+  }
+
+  /** Adds a part to the Where clause connected with AND */
+  public void addPart(String part) {
+    if (builder.length() > 0) {
+      builder.append(" AND ");
+    }
+    builder.append(part);
+  }
+
+  public void equals(String name, String value) {
+    addPart(name + " = " + value);
+  }
+
+  public void lessThan(String name, String value) {
+    addPart(name + " < " + value);
+  }
+  
+  public void lessThanEq(String name, String value) {
+    addPart(name + " <= " + value);
+  }
+  
+  public void greaterThan(String name, String value) {
+    addPart(name + " > " + value);
+  }
+  
+  public void greaterThanEq(String name, String value) {
+    addPart(name + " >= " + value);
+  }
+  
+  public boolean isEmpty() {
+    return builder.length() == 0;
+  }
+  
+  @Override
+  public String toString() {
+    return builder.toString();
+  }
+}

Added: incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/store/Column.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/store/Column.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/store/Column.java (added)
+++ incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/store/Column.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,115 @@
+
+package org.gora.sql.store;
+
+import org.gora.sql.store.SqlTypeInterface.JdbcType;
+
+public class Column {
+
+  public static enum MappingStrategy {
+    SERIALIZED,
+    JOIN_TABLE,
+    SECONDARY_TABLE,
+  }
+
+  private String tableName;
+  private String name;
+  private JdbcType jdbcType;
+  private String sqlType;
+  private boolean isPrimaryKey;
+  private int length = -1;
+  private int scale = -1;
+  private MappingStrategy mappingStrategy;
+
+  //index, not-null, default-value
+
+  public Column() {
+  }
+
+  public Column(String name) {
+    this.name = name;
+  }
+
+  public Column(String name, boolean isPrimaryKey, JdbcType jdbcType, String sqlType
+      , int length, int scale) {
+    this.name = name;
+    this.isPrimaryKey = isPrimaryKey;
+    this.jdbcType = jdbcType;
+    this.length = length;
+    this.scale = scale;
+    this.mappingStrategy = MappingStrategy.SERIALIZED;
+    this.sqlType = sqlType == null ? jdbcType.getSqlType() : sqlType;
+  }
+
+  public Column(String name, boolean isPrimaryKey, JdbcType jdbcType
+      , int length, int scale) {
+    this(name, isPrimaryKey, jdbcType, null, length, scale);
+  }
+  
+  public Column(String name, boolean isPrimaryKey) {
+    this.name = name;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public JdbcType getJdbcType() {
+    return jdbcType;
+  }
+
+  public void setJdbcType(JdbcType jdbcType) {
+    this.jdbcType = jdbcType;
+  }
+
+  public String getSqlType() {
+    return sqlType;
+  }
+  
+  public void setSqlType(String sqlType) {
+    this.sqlType = sqlType;
+  }
+  
+  public void setLength(int length) {
+    this.length = length;
+  }
+
+  public int getLength() {
+    return length;
+  }
+
+  public int getScale() {
+    return scale;
+  }
+
+  public void setScale(int scale) {
+    this.scale = scale;
+  }
+
+  public int getScaleOrLength() {
+    return length > 0 ? length : scale;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public MappingStrategy getMappingStrategy() {
+    return mappingStrategy;
+  }
+
+  public void setMappingStrategy(MappingStrategy mappingStrategy) {
+    this.mappingStrategy = mappingStrategy;
+  }
+
+  public boolean isPrimaryKey() {
+    return isPrimaryKey;
+  }
+}

Added: incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/store/SqlMapping.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/store/SqlMapping.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/store/SqlMapping.java (added)
+++ incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/store/SqlMapping.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,55 @@
+
+package org.gora.sql.store;
+
+import java.util.HashMap;
+
+import org.gora.sql.store.SqlTypeInterface.JdbcType;
+
+public class SqlMapping {
+
+  private String tableName;
+  private HashMap<String, Column> fields;
+  private Column primaryColumn;
+
+  public SqlMapping() {
+    fields = new HashMap<String, Column>();
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public void addField(String fieldname, String column) {
+    fields.put(fieldname, new Column(column));
+  }
+
+  public void addField(String fieldName, String columnName, JdbcType jdbcType,
+      String sqlType, int length, int scale) {
+    fields.put(fieldName, new Column(columnName, false, jdbcType, sqlType, length, scale));
+  }
+
+  public Column getColumn(String fieldname) {
+    return fields.get(fieldname);
+  }
+
+  public void setPrimaryKey(String columnName, JdbcType jdbcType,
+      int length, int scale) {
+    primaryColumn = new Column(columnName, true, jdbcType, length, scale);
+  }
+
+  public Column getPrimaryColumn() {
+    return primaryColumn;
+  }
+
+  public String getPrimaryColumnName() {
+    return primaryColumn.getName();
+  }
+
+  public HashMap<String, Column> getFields() {
+    return fields;
+  }
+}

Added: incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/store/SqlStore.java
URL: http://svn.apache.org/viewvc/incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/store/SqlStore.java?rev=1006024&view=auto
==============================================================================
--- incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/store/SqlStore.java (added)
+++ incubator/gora/trunk/gora-sql/src/main/java/org/gora/sql/store/SqlStore.java Fri Oct  8 21:17:10 2010
@@ -0,0 +1,864 @@
+
+package org.gora.sql.store;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.ipc.ByteBufferInputStream;
+import org.apache.avro.ipc.ByteBufferOutputStream;
+import org.apache.avro.specific.SpecificFixed;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.gora.persistency.Persistent;
+import org.gora.persistency.StateManager;
+import org.gora.query.PartitionQuery;
+import org.gora.query.Query;
+import org.gora.query.Result;
+import org.gora.query.impl.PartitionQueryImpl;
+import org.gora.sql.query.SqlQuery;
+import org.gora.sql.query.SqlResult;
+import org.gora.sql.statement.Delete;
+import org.gora.sql.statement.InsertUpdateStatement;
+import org.gora.sql.statement.InsertUpdateStatementFactory;
+import org.gora.sql.statement.SelectStatement;
+import org.gora.sql.statement.Where;
+import org.gora.sql.store.SqlTypeInterface.JdbcType;
+import org.gora.sql.util.SqlUtils;
+import org.gora.store.DataStoreFactory;
+import org.gora.store.impl.DataStoreBase;
+import org.gora.util.AvroUtils;
+import org.gora.util.IOUtils;
+import org.gora.util.StringUtils;
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.input.SAXBuilder;
+
+import com.healthmarketscience.sqlbuilder.CreateTableQuery;
+import com.healthmarketscience.sqlbuilder.CreateTableQuery.ColumnConstraint;
+import com.healthmarketscience.sqlbuilder.dbspec.basic.DbSchema;
+import com.healthmarketscience.sqlbuilder.dbspec.basic.DbSpec;
+import com.healthmarketscience.sqlbuilder.dbspec.basic.DbTable;
+
+/**
+ * A DataStore implementation for RDBMS with a SQL interface. SqlStore
+ * uses JDBC drivers to communicate with the DB.
+ */
+public class SqlStore<K, T extends Persistent> extends DataStoreBase<K, T> {
+
+  /** The vendor of the DB */
+  public static enum DBVendor {
+    MYSQL,
+    HSQL,
+    GENERIC;
+
+    static DBVendor getVendor(String dbProductName) {
+      String name = dbProductName.toLowerCase();
+      if(name.contains("mysql"))
+        return MYSQL;
+      else if(name.contains("hsql"))
+        return HSQL;
+      return GENERIC;
+    }
+  }
+
+  private static final Log log = LogFactory.getLog(SqlStore.class);
+
+  /** The JDBC Driver class name */
+  protected static final String DRIVER_CLASS_PROPERTY = "jdbc.driver";
+
+  /** JDBC Database access URL */
+  protected static final String URL_PROPERTY = "jdbc.url";
+
+  /** User name to access the database */
+  protected static final String USERNAME_PROPERTY = "jdbc.user";
+
+  /** Password to access the database */
+  protected static final String PASSWORD_PROPERTY = "jdbc.password";
+
+  protected static final String DEFAULT_MAPPING_FILE = "gora-sql-mapping.xml";
+
+  private String jdbcDriverClass;
+  private String jdbcUrl;
+  private String jdbcUsername;
+  private String jdbcPassword;
+
+  private SqlMapping mapping;
+
+  private Connection connection; //no connection pooling yet
+
+  private DatabaseMetaData metadata;
+  private boolean dbMixedCaseIdentifiers, dbLowerCaseIdentifiers, dbUpperCaseIdentifiers;
+  private HashMap<String, JdbcType> dbTypeMap;
+
+  private HashSet<PreparedStatement> writeCache;
+
+  private int keySqlType;
+
+  private DbTable sqlTable;
+
+  private Column primaryColumn;
+
+  private String dbProductName;
+
+  private DBVendor dbVendor;
+
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass,
+      Properties properties) throws IOException {
+    super.initialize(keyClass, persistentClass, properties);
+
+    jdbcDriverClass = DataStoreFactory.findProperty(properties
+        , this, DRIVER_CLASS_PROPERTY, null);
+    jdbcUrl = DataStoreFactory.findProperty(properties
+        , this, URL_PROPERTY, null);
+    jdbcUsername = DataStoreFactory.findProperty(properties
+        , this, USERNAME_PROPERTY, null);
+    jdbcPassword = DataStoreFactory.findProperty(properties
+        , this, PASSWORD_PROPERTY, null);
+
+    String mappingFile = DataStoreFactory.getMappingFile(properties, this
+        , DEFAULT_MAPPING_FILE);
+
+    connection = getConnection();
+    initDbMetadata();
+
+    mapping = readMapping(mappingFile);
+
+    sqlTable = createSqlTable(mapping);
+
+    writeCache = new HashSet<PreparedStatement>();
+
+    keySqlType = SqlTypeInterface.getSqlType(keyClass);
+
+    if(autoCreateSchema) {
+      createSchema();
+    }
+
+    this.conf = getOrCreateConf();
+  }
+
+  @Override
+  public String getSchemaName() {
+    return mapping.getTableName();
+  }
+
+  @Override
+  public void close() throws IOException {
+    flush();
+    if(connection!=null) {
+      try {
+        connection.commit();
+        if(dbVendor == DBVendor.HSQL && jdbcUrl.contains(":file:")) {
+          connection.prepareStatement("SHUTDOWN").executeUpdate();
+        }
+        connection.close();
+      } catch (SQLException ex) {
+        throw new IOException(ex);
+      }
+    }
+  }
+
+  private void setColumnConstraintForQuery(CreateTableQuery query, Column column) {
+    ColumnConstraint constraint = getColumnConstraint(column);
+    if(constraint != null) {
+      query.setColumnConstraint(sqlTable.findColumn(column.getName()), constraint);
+    }
+  }
+
+  @Override
+  public void createSchema() throws IOException {
+    if(!schemaExists()) {
+
+      log.info("creating schema: " + sqlTable.getAbsoluteName());
+
+      CreateTableQuery query = new CreateTableQuery(sqlTable, true);
+
+      setColumnConstraintForQuery(query, primaryColumn);
+      for(Column column : mapping.getFields().values()) {
+        setColumnConstraintForQuery(query, column);
+      }
+
+      PreparedStatement statement = null;
+      try {
+        statement = connection.prepareStatement(query.validate().toString());
+        statement.executeUpdate();
+      } catch (SQLException ex) {
+        throw new IOException(ex);
+      } finally {
+        SqlUtils.close(statement);
+      }
+    }
+  }
+
+  private ColumnConstraint getColumnConstraint(Column column) {
+    if(column.isPrimaryKey()) {
+      return ColumnConstraint.PRIMARY_KEY;
+    }
+    return null;
+  }
+
+  @Override
+  public void deleteSchema() throws IOException {
+    flush();
+    if(schemaExists()) {
+      PreparedStatement statement = null;
+      try {
+        log.info("dropping schema:" + sqlTable.getAbsoluteName());
+
+        //DropQuery does not work
+        statement = connection.prepareStatement(
+            "DROP TABLE " + sqlTable.getAbsoluteName());
+        statement.executeUpdate();
+
+        connection.commit();
+      } catch (SQLException ex) {
+        throw new IOException(ex);
+      } finally {
+        SqlUtils.close(statement);
+      }
+    }
+  }
+
+  @Override
+  public boolean schemaExists() throws IOException {
+    ResultSet resultSet = null;
+    try {
+      DatabaseMetaData metadata = connection.getMetaData();
+      String tableName = mapping.getTableName();
+
+      resultSet = metadata.getTables(null, null, tableName, null);
+
+      if(resultSet.next())
+        return true;
+
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    } finally {
+      SqlUtils.close(resultSet);
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean delete(K key) throws IOException {
+    Delete delete = new Delete();
+    delete.from(sqlTable.getName())
+          .where().equals(primaryColumn.getName(), "?");
+
+    PreparedStatement statement = null;
+    try {
+      statement = connection.prepareStatement(delete.toString());
+      setObject(statement, 1, key, keySqlType, primaryColumn);
+
+      int ret = statement.executeUpdate();
+      return ret > 0;
+    } catch (SQLException ex) {
+      throw new IOException(ex);
+    } finally {
+      SqlUtils.close(statement);
+    }
+  }
+
+  @Override
+  public long deleteByQuery(Query<K, T> query) throws IOException {
+    Delete delete = new Delete().from(sqlTable.getName());
+    delete.where(constructWhereClause(query));
+
+    PreparedStatement statement = null;
+    try {
+      statement = connection.prepareStatement(delete.toString());
+      setParametersForPreparedStatement(statement, query);
+
+      return statement.executeUpdate();
+
+    } catch (SQLException ex) {
+      throw new IOException(ex);
+    } finally {
+      SqlUtils.close(statement);
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    Exception deferred = null;
+    synchronized (writeCache) {
+      for(PreparedStatement stmt : writeCache) {
+        try {
+          stmt.executeBatch();
+        } catch (SQLException ex) {
+          deferred = ex;
+          break;
+        }
+      }
+      for(PreparedStatement stmt : writeCache) {
+        SqlUtils.close(stmt);
+      }
+      writeCache.clear();
+    }
+    if(deferred != null)
+      throw new IOException(deferred);
+    try {
+      connection.commit();
+    } catch (SQLException ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  @Override
+  public T get(K key, String[] requestFields) throws IOException {
+    requestFields = getFieldsToQuery(requestFields);
+
+    ResultSet resultSet = null;
+    PreparedStatement statement = null;
+    try {
+      Where where = new Where();
+      SelectStatement select = new SelectStatement(mapping.getTableName());
+      select.setWhere(where);
+
+//      boolean isPrimarySelected = false;
+//      for (int i = 0; i < requestFields.length; i++) {
+//        if(primaryColumn.getName().equals(primaryColumn)) {
+//          isPrimarySelected = true;
+//          break;
+//        }
+//      }
+//      if(!isPrimarySelected) {
+//        requestFields = StringUtils.append(requestFields, primaryColumn.getName());
+//      }
+
+      for (int i = 0; i < requestFields.length; i++) {
+        Column column = mapping.getColumn(requestFields[i]);
+
+        select.addToSelectList(column.getName());
+      }
+
+      where.equals(primaryColumn.getName(), "?");
+      statement = getConnection().prepareStatement(select.toString());
+
+      setObject(statement, 1, key, keySqlType, primaryColumn);
+
+      resultSet = statement.executeQuery();
+
+      if(!resultSet.next()) { //no matching result
+        return null;
+      }
+
+      return readObject(resultSet, newPersistent(), requestFields);
+    } catch (SQLException ex) {
+      throw new IOException(ex);
+    } finally {
+      SqlUtils.close(resultSet);
+      SqlUtils.close(statement);
+    }
+  }
+
+  @Override
+  public Result<K, T> execute(Query<K, T> query) throws IOException {
+    query.setFields(getFieldsToQuery(query.getFields()));
+    String[] requestFields = query.getFields();
+
+    ResultSet resultSet = null;
+    PreparedStatement statement = null;
+    try {
+      Where where = constructWhereClause(query);
+      SelectStatement select = new SelectStatement(mapping.getTableName());
+      select.setWhere(where);
+
+      select.addToSelectList(primaryColumn.getName());
+      for (int i = 0; i < requestFields.length; i++) {
+        Column column = mapping.getColumn(requestFields[i]);
+
+        select.addToSelectList(column.getName());
+      }
+
+      if(query.getLimit() > 0) {
+        select.setLimit(query.getLimit());
+      }
+
+      statement = getConnection().prepareStatement(select.toString());
+
+      setParametersForPreparedStatement(statement, query);
+
+      resultSet = statement.executeQuery();
+
+      return new SqlResult<K, T>(this, query, resultSet, statement);
+    } catch (SQLException ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  private Where constructWhereClause(Query<K,T> query) {
+    Where where = new Where();
+    if (query.getKey() != null) {
+      where.equals(primaryColumn.getName(), "?");
+    } else {
+      if (query.getStartKey() != null) {
+        where.greaterThanEq(primaryColumn.getName(), "?");
+      }
+      if(query.getEndKey() != null) {
+        where.lessThanEq(primaryColumn.getName(), "?");
+      }
+    }
+    return where;
+  }
+
+  private void setParametersForPreparedStatement(PreparedStatement statement
+      , Query<K,T> query) throws SQLException, IOException {
+    int offset = 1;
+    if(query.getKey() != null) {
+      setObject(statement, offset++, query.getKey(), keySqlType, primaryColumn);
+    } else {
+      if(query.getStartKey() != null) {
+        setObject(statement, offset++, query.getStartKey(), keySqlType, primaryColumn);
+      }
+      if(query.getEndKey() != null) {
+        setObject(statement, offset++, query.getEndKey(), keySqlType, primaryColumn);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public K readPrimaryKey(ResultSet resultSet) throws SQLException {
+    return (K) resultSet.getObject(primaryColumn.getName());
+  }
+
+  public T readObject(ResultSet rs, T persistent
+      , String[] requestFields) throws SQLException, IOException {
+    if(rs == null) {
+      return null;
+    }
+
+    for(int i=0; i<requestFields.length; i++) {
+      String f = requestFields[i];
+      Field field = fieldMap.get(f);
+      Schema fieldSchema = field.schema();
+      Type type = fieldSchema.getType();
+      Column column = mapping.getColumn(field.name());
+      String columnName = column.getName();
+      int columnIndex = rs.findColumn(columnName);
+
+      if (rs.getObject(columnIndex) == null) {
+        continue;
+      }
+      switch(type) {
+        case MAP:
+          readField(rs, columnIndex, persistent.get(field.pos()), fieldSchema, column);
+          break;
+        case ARRAY:
+          readField(rs, columnIndex, persistent.get(field.pos()), fieldSchema, column);
+          break;
+        case BOOLEAN:
+          persistent.put(field.pos(), rs.getBoolean(columnIndex));
+          break;
+        case BYTES:
+          persistent.put(field.pos(), ByteBuffer.wrap(getBytes(rs, columnIndex, fieldSchema, column)));
+          break;
+        case DOUBLE:
+          persistent.put(field.pos(), rs.getDouble(columnIndex));
+          break;
+        case ENUM:
+          Object val = AvroUtils.getEnumValue(fieldSchema, rs.getString(columnIndex));
+          persistent.put(field.pos(), val);
+          break;
+        case FIXED:
+          ((SpecificFixed)persistent.get(i)).bytes(getBytes(rs, columnIndex, fieldSchema, column));
+          break;
+        case FLOAT:
+          persistent.put(field.pos(), rs.getFloat(columnIndex));
+          break;
+        case INT:
+          persistent.put(field.pos(), rs.getInt(columnIndex));
+          break;
+        case LONG:
+          persistent.put(field.pos(), rs.getLong(columnIndex));
+          break;
+        case NULL:
+          break;
+        case RECORD:
+          Object o = readField(rs, columnIndex, persistent.get(field.pos()), fieldSchema, column);
+          persistent.put(field.pos(), o);
+          break;
+        case STRING:
+          persistent.put(field.pos(), new Utf8(rs.getString(columnIndex)));
+          break;
+        case UNION:
+          throw new IOException("Union is not supported yet");
+      }
+      persistent.setDirty(field.pos());
+    }
+    persistent.clearDirty();
+    return persistent;
+  }
+
+  protected byte[] getBytes(ResultSet resultSet, int columnIndex, Schema schema, Column column)
+    throws SQLException, IOException {
+    switch(column.getJdbcType()) {
+      case BLOB          : Blob blob = resultSet.getBlob(columnIndex);
+                           return IOUtils.readFully(blob.getBinaryStream());
+      case BINARY        :
+      case VARBINARY     : return resultSet.getBytes(columnIndex);
+      case LONGVARBINARY : return IOUtils.readFully(resultSet.getBinaryStream(columnIndex));
+    }
+    return null;
+  }
+
+  protected Object readField(ResultSet resultSet, int columnIndex, Object field
+      , Schema schema, Column column) throws SQLException, IOException {
+
+    InputStream is = null;
+    byte[] bytes = null;
+
+    JdbcType type = JdbcType.get(resultSet.getMetaData().getColumnType(columnIndex));
+
+    switch(type) {
+      case BLOB          : Blob blob = resultSet.getBlob(columnIndex);
+                           if (blob != null) is = blob.getBinaryStream(); break;
+      case BINARY        :
+      case VARBINARY     : bytes = resultSet.getBytes(columnIndex); break;
+      case LONGVARBINARY : is = resultSet.getBinaryStream(columnIndex); break;
+    }
+
+    if(bytes!=null)
+      return IOUtils.deserialize(bytes, datumReader, schema, field);
+    else if(is != null)
+      return IOUtils.deserialize(is, datumReader, schema, field);
+    return field; //field is empty
+  }
+
+  @Override
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
+  throws IOException {
+    //TODO: implement this using Hadoop DB support
+
+    ArrayList<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>();
+    partitions.add(new PartitionQueryImpl<K,T>(query));
+
+    return partitions;
+  }
+
+  @Override
+  public Query<K, T> newQuery() {
+    return new SqlQuery<K, T>(this);
+  }
+
+  @Override
+  public void put(K key, T persistent) throws IOException {
+    boolean hasDirty = false;
+    try {
+      //TODO: INSERT or UPDATE
+
+      Schema schema = persistent.getSchema();
+      StateManager stateManager = persistent.getStateManager();
+
+      List<Field> fields = schema.getFields();
+
+      InsertUpdateStatement<K, T> insertStatement =
+        InsertUpdateStatementFactory.createStatement(this, mapping, dbVendor);
+
+      insertStatement.setObject(key, null, mapping.getPrimaryColumn());
+      for (int i = 0; i < fields.size(); i++) {
+        Field field = fields.get(i);
+        if (!stateManager.isDirty(persistent, i)) {
+          continue;
+        }
+        hasDirty = true;
+
+        Column column = mapping.getColumn(field.name());
+        insertStatement.setObject(persistent.get(i), field.schema(), column);
+      }
+
+      if (!hasDirty) {
+        return;
+      }
+
+      //jdbc already should cache the ps
+      PreparedStatement insert = insertStatement.toStatement(connection);
+      insert.addBatch();
+      synchronized (writeCache) {
+        writeCache.add(insert);
+      }
+
+    }catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  /**
+   * Sets the object to the preparedStatement by it's schema
+   */
+  public void setObject(PreparedStatement statement, int index, Object object
+      , Schema schema, Column column) throws SQLException, IOException {
+
+    Type type = schema.getType();
+
+    switch(type) {
+      case MAP:
+        setField(statement, column, schema, index, object);
+        break;
+      case ARRAY:
+        setField(statement, column, schema, index, object);
+        break;
+      case BOOLEAN:
+        statement.setBoolean(index, (Boolean)object);
+        break;
+      case BYTES:
+        setBytes(statement, column, index, ((ByteBuffer)object).array());
+        break;
+      case DOUBLE:
+        statement.setDouble(index, (Double)object);
+        break;
+      case ENUM:
+        statement.setString(index, ((Enum<?>)object).name());
+        break;
+      case FIXED:
+        setBytes(statement, column, index, ((GenericFixed)object).bytes());
+        break;
+      case FLOAT:
+        statement.setFloat(index, (Float)object);
+        break;
+      case INT:
+        statement.setInt(index, (Integer)object);
+        break;
+      case LONG:
+        statement.setLong(index, (Long)object);
+        break;
+      case NULL:
+        break;
+      case RECORD:
+        setField(statement, column, schema, index, object);
+        break;
+      case STRING:
+        statement.setString(index, ((Utf8)object).toString());
+        break;
+      case UNION:
+        throw new IOException("Union is not supported yet");
+    }
+  }
+
+  protected <V> void setObject(PreparedStatement statement, int index, V object
+      , int objectType, Column column) throws SQLException, IOException {
+    statement.setObject(index, object, objectType, column.getScaleOrLength());
+  }
+
+  protected void setBytes(PreparedStatement statement, Column column, int index, byte[] value)
+  throws SQLException   {
+
+    switch(column.getJdbcType()) {
+      case BLOB:
+        statement.setBlob(index, new ByteArrayInputStream(value), value.length);
+        break;
+      case BINARY: case VARBINARY:
+        statement.setBytes(index, value);
+        break;
+      case LONGVARBINARY:
+        statement.setBinaryStream(index, new ByteArrayInputStream(value));
+        break;
+    }
+  }
+
+  /** Serializes the field using Avro to a BLOB field */
+  protected void setField(PreparedStatement statement, Column column, Schema schema
+      , int index, Object object)
+  throws IOException, SQLException {
+
+    OutputStream os = null;
+    Blob blob = null;
+
+    JdbcType type = column.getJdbcType();
+
+    switch(type) {
+      case BLOB          : blob = connection.createBlob();
+                           os = blob.setBinaryStream(1); break;
+      case BINARY        :
+      case VARBINARY     :
+      case LONGVARBINARY : os = new ByteBufferOutputStream(); break;
+    }
+
+    IOUtils.serialize(os, datumWriter, schema, object);
+    os.close();
+
+    switch(type) {
+      case BLOB          : statement.setBlob(index, blob); break;
+      case BINARY        :
+      case VARBINARY     : statement.setBytes(index
+          , IOUtils.getAsBytes(((ByteBufferOutputStream)os).getBufferList())); break;
+      case LONGVARBINARY : statement.setBinaryStream(index,
+          new ByteBufferInputStream(((ByteBufferOutputStream)os).getBufferList())); break;
+    }
+  }
+
+  protected Connection getConnection() throws IOException {
+    try {
+      Connection connection = null;
+
+      Class.forName(jdbcDriverClass);
+      if(jdbcUsername == null || jdbcUsername.length() == 0) {
+        connection = DriverManager.getConnection(jdbcUrl);
+      } else {
+        connection = DriverManager.getConnection(jdbcUrl, jdbcUsername,
+            jdbcPassword);
+      }
+
+      connection.setAutoCommit(false);
+
+      return connection;
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  protected void initDbMetadata() throws IOException {
+    try {
+      metadata = connection.getMetaData();
+
+      dbMixedCaseIdentifiers = metadata.storesMixedCaseIdentifiers();
+      dbLowerCaseIdentifiers = metadata.storesLowerCaseIdentifiers();
+      dbUpperCaseIdentifiers = metadata.storesUpperCaseIdentifiers();
+      dbProductName          = metadata.getDatabaseProductName();
+      dbVendor               = DBVendor.getVendor(dbProductName);
+
+      ResultSet rs = metadata.getTypeInfo();
+      dbTypeMap = new HashMap<String, JdbcType>();
+
+      while(rs.next()) {
+        JdbcType type = JdbcType.get(rs.getInt("DATA_TYPE"));
+        dbTypeMap.put(rs.getString("TYPE_NAME"), type);
+      }
+      rs.close();
+
+    } catch (SQLException ex) {
+      throw new IOException();
+    }
+  }
+
+  protected String getIdentifier(String identifier) {
+    if(identifier == null)
+      return identifier;
+    if(!dbMixedCaseIdentifiers) {
+      if(dbLowerCaseIdentifiers) {
+        return identifier.toLowerCase();
+      }
+      else if(dbUpperCaseIdentifiers) {
+        return identifier.toUpperCase();
+      }
+    }
+    return identifier;
+  }
+
+  private void addColumn(DbTable table, Column column) {
+    Integer length =  column.getScaleOrLength();
+    length = length > 0 ? length : null;
+    table.addColumn(column.getName(), column.getSqlType(), length);
+  }
+
+  protected DbTable createSqlTable(SqlMapping mapping) {
+    // create default schema
+    DbSpec spec = new DbSpec();
+    DbSchema schema = spec.addDefaultSchema();
+
+    DbTable table = schema.addTable(mapping.getTableName());
+
+    addColumn(table, primaryColumn);
+    for(Map.Entry<String, Column> entry : mapping.getFields().entrySet()) {
+      addColumn(table, entry.getValue());
+    }
+
+    return table;
+  }
+
+  private void addField(SqlMapping mapping, String fieldName, Element ele)
+  throws IOException {
+      String columnName = ele.getAttributeValue("column");
+
+      String jdbcTypeStr = ele.getAttributeValue("jdbc-type");
+
+      int length = StringUtils.parseInt(ele.getAttributeValue("length"), -1);
+      int scale = StringUtils.parseInt(ele.getAttributeValue("scale"), -1);
+
+      JdbcType jdbcType;
+      if (jdbcTypeStr != null) {
+        jdbcType = dbTypeMap.get(jdbcTypeStr);
+        if(jdbcType == null)
+          jdbcType = SqlTypeInterface.stringToJdbcType(jdbcTypeStr);
+      } else if (fieldName == null) { // fieldName == null implies primary key
+        jdbcType = SqlTypeInterface.getJdbcType(keyClass, length, scale);
+        mapping.setPrimaryKey(columnName, jdbcType, length, scale);
+      } else {
+        Schema fieldSchema = schema.getField(fieldName).schema();
+        jdbcType = SqlTypeInterface.getJdbcType(fieldSchema, length, scale);
+        mapping.addField(fieldName, columnName, jdbcType, jdbcTypeStr, length, scale);
+      }
+
+      if (fieldName == null) {
+        mapping.setPrimaryKey(columnName, jdbcType, length, scale);
+      } else {
+        mapping.addField(fieldName, columnName, jdbcType, jdbcTypeStr, length, scale);
+      }
+  }
+
+  @SuppressWarnings("unchecked")
+  protected SqlMapping readMapping(String filename) throws IOException {
+
+    SqlMapping mapping = new SqlMapping();
+
+    try {
+      SAXBuilder builder = new SAXBuilder();
+      Document doc = builder.build(getClass().getClassLoader()
+          .getResourceAsStream(filename));
+
+      List<Element> classes = doc.getRootElement().getChildren("class");
+
+      for(Element classElement: classes) {
+        if(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName())
+            && classElement.getAttributeValue("name").equals(
+                persistentClass.getCanonicalName())) {
+
+          String tableName = getIdentifier(getSchemaName(
+              classElement.getAttributeValue("table"), persistentClass));
+          mapping.setTableName(tableName);
+
+          Element primaryKeyEl = classElement.getChild("primarykey");
+          addField(mapping, null, primaryKeyEl);
+
+          List<Element> fields = classElement.getChildren("field");
+
+          for(Element field:fields) {
+            String fieldName = field.getAttributeValue("name");
+            addField(mapping, fieldName, field);
+          }
+
+          break;
+        }
+      }
+
+      primaryColumn = mapping.getPrimaryColumn();
+
+    } catch(Exception ex) {
+      throw new IOException(ex);
+    }
+
+    return mapping;
+  }
+}



Mime
View raw message